GRPCAsyncServerHandlerTests.swift 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #if compiler(>=5.6)
  17. @testable import GRPC
  18. import NIOCore
  19. import NIOEmbedded
  20. import NIOHPACK
  21. import NIOPosix
  22. import XCTest
  23. // MARK: - Tests
  24. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
  25. class AsyncServerHandlerTests: GRPCTestCase {
  26. private let recorder = AsyncResponseStream()
  27. private var group: EventLoopGroup!
  28. private var loop: EventLoop!
  29. override func setUp() {
  30. super.setUp()
  31. self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
  32. self.loop = self.group.next()
  33. }
  34. override func tearDown() {
  35. XCTAssertNoThrow(try self.group.syncShutdownGracefully())
  36. super.tearDown()
  37. }
  38. func makeCallHandlerContext(
  39. encoding: ServerMessageEncoding = .disabled
  40. ) -> CallHandlerContext {
  41. let closeFuture = self.loop.makeSucceededVoidFuture()
  42. return CallHandlerContext(
  43. errorDelegate: nil,
  44. logger: self.logger,
  45. encoding: encoding,
  46. eventLoop: self.loop,
  47. path: "/ignored",
  48. remoteAddress: nil,
  49. responseWriter: self.recorder,
  50. allocator: ByteBufferAllocator(),
  51. closeFuture: closeFuture
  52. )
  53. }
  54. private func makeHandler(
  55. encoding: ServerMessageEncoding = .disabled,
  56. callType: GRPCCallType = .bidirectionalStreaming,
  57. observer: @escaping @Sendable (
  58. GRPCAsyncRequestStream<String>,
  59. GRPCAsyncResponseStreamWriter<String>,
  60. GRPCAsyncServerCallContext
  61. ) async throws -> Void
  62. ) -> AsyncServerHandler<StringSerializer, StringDeserializer, String, String> {
  63. return AsyncServerHandler(
  64. context: self.makeCallHandlerContext(encoding: encoding),
  65. requestDeserializer: StringDeserializer(),
  66. responseSerializer: StringSerializer(),
  67. callType: callType,
  68. interceptors: [],
  69. userHandler: observer
  70. )
  71. }
  72. @Sendable
  73. private static func echo(
  74. requests: GRPCAsyncRequestStream<String>,
  75. responseStreamWriter: GRPCAsyncResponseStreamWriter<String>,
  76. context: GRPCAsyncServerCallContext
  77. ) async throws {
  78. for try await message in requests {
  79. try await responseStreamWriter.send(message)
  80. }
  81. }
  82. @Sendable
  83. private static func neverReceivesMessage(
  84. requests: GRPCAsyncRequestStream<String>,
  85. responseStreamWriter: GRPCAsyncResponseStreamWriter<String>,
  86. context: GRPCAsyncServerCallContext
  87. ) async throws {
  88. for try await message in requests {
  89. XCTFail("Unexpected message: '\(message)'")
  90. }
  91. }
  92. @Sendable
  93. private static func neverCalled(
  94. requests: GRPCAsyncRequestStream<String>,
  95. responseStreamWriter: GRPCAsyncResponseStreamWriter<String>,
  96. context: GRPCAsyncServerCallContext
  97. ) async throws {
  98. XCTFail("This observer should never be called")
  99. }
  100. func testHappyPath() async throws {
  101. let handler = self.makeHandler(
  102. observer: Self.echo(requests:responseStreamWriter:context:)
  103. )
  104. defer {
  105. XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
  106. }
  107. self.loop.execute {
  108. handler.receiveMetadata([:])
  109. handler.receiveMessage(ByteBuffer(string: "1"))
  110. handler.receiveMessage(ByteBuffer(string: "2"))
  111. handler.receiveMessage(ByteBuffer(string: "3"))
  112. handler.receiveEnd()
  113. }
  114. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  115. try await responseStream.next().assertMetadata()
  116. for expected in ["1", "2", "3"] {
  117. try await responseStream.next().assertMessage { buffer, metadata in
  118. XCTAssertEqual(buffer, .init(string: expected))
  119. XCTAssertFalse(metadata.compress)
  120. }
  121. }
  122. try await responseStream.next().assertStatus { status, _ in
  123. XCTAssertEqual(status.code, .ok)
  124. }
  125. try await responseStream.next().assertNil()
  126. }
  127. func testHappyPathWithCompressionEnabled() async throws {
  128. let handler = self.makeHandler(
  129. encoding: .enabled(.init(decompressionLimit: .absolute(.max))),
  130. observer: Self.echo(requests:responseStreamWriter:context:)
  131. )
  132. defer {
  133. XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
  134. }
  135. self.loop.execute {
  136. handler.receiveMetadata([:])
  137. handler.receiveMessage(ByteBuffer(string: "1"))
  138. handler.receiveMessage(ByteBuffer(string: "2"))
  139. handler.receiveMessage(ByteBuffer(string: "3"))
  140. handler.receiveEnd()
  141. }
  142. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  143. try await responseStream.next().assertMetadata()
  144. for expected in ["1", "2", "3"] {
  145. try await responseStream.next().assertMessage { buffer, metadata in
  146. XCTAssertEqual(buffer, .init(string: expected))
  147. XCTAssertTrue(metadata.compress)
  148. }
  149. }
  150. try await responseStream.next().assertStatus()
  151. try await responseStream.next().assertNil()
  152. }
  153. func testHappyPathWithCompressionEnabledButDisabledByCaller() async throws {
  154. let handler = self.makeHandler(
  155. encoding: .enabled(.init(decompressionLimit: .absolute(.max)))
  156. ) { requests, responseStreamWriter, context in
  157. try await context.response.compressResponses(false)
  158. return try await Self.echo(
  159. requests: requests,
  160. responseStreamWriter: responseStreamWriter,
  161. context: context
  162. )
  163. }
  164. defer {
  165. XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
  166. }
  167. self.loop.execute {
  168. handler.receiveMetadata([:])
  169. handler.receiveMessage(ByteBuffer(string: "1"))
  170. handler.receiveMessage(ByteBuffer(string: "2"))
  171. handler.receiveMessage(ByteBuffer(string: "3"))
  172. handler.receiveEnd()
  173. }
  174. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  175. try await responseStream.next().assertMetadata()
  176. for expected in ["1", "2", "3"] {
  177. try await responseStream.next().assertMessage { buffer, metadata in
  178. XCTAssertEqual(buffer, .init(string: expected))
  179. XCTAssertFalse(metadata.compress)
  180. }
  181. }
  182. try await responseStream.next().assertStatus()
  183. try await responseStream.next().assertNil()
  184. }
  185. func testResponseHeadersAndTrailersSentFromContext() async throws {
  186. let handler = self.makeHandler { _, responseStreamWriter, context in
  187. try await context.response.setHeaders(["pontiac": "bandit"])
  188. try await responseStreamWriter.send("1")
  189. try await context.response.setTrailers(["disco": "strangler"])
  190. }
  191. defer {
  192. XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
  193. }
  194. self.loop.execute {
  195. handler.receiveMetadata([:])
  196. handler.receiveEnd()
  197. }
  198. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  199. try await responseStream.next().assertMetadata { headers in
  200. XCTAssertEqual(headers, ["pontiac": "bandit"])
  201. }
  202. try await responseStream.next().assertMessage()
  203. try await responseStream.next().assertStatus { _, trailers in
  204. XCTAssertEqual(trailers, ["disco": "strangler"])
  205. }
  206. try await responseStream.next().assertNil()
  207. }
  208. func testThrowingDeserializer() async throws {
  209. let handler = AsyncServerHandler(
  210. context: self.makeCallHandlerContext(),
  211. requestDeserializer: ThrowingStringDeserializer(),
  212. responseSerializer: StringSerializer(),
  213. callType: .bidirectionalStreaming,
  214. interceptors: [],
  215. userHandler: Self.neverReceivesMessage(requests:responseStreamWriter:context:)
  216. )
  217. defer {
  218. XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
  219. }
  220. self.loop.execute {
  221. handler.receiveMetadata([:])
  222. handler.receiveMessage(ByteBuffer(string: "hello"))
  223. }
  224. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  225. try await responseStream.next().assertStatus { status, _ in
  226. XCTAssertEqual(status.code, .internalError)
  227. }
  228. try await responseStream.next().assertNil()
  229. }
  230. func testThrowingSerializer() async throws {
  231. let handler = AsyncServerHandler(
  232. context: self.makeCallHandlerContext(),
  233. requestDeserializer: StringDeserializer(),
  234. responseSerializer: ThrowingStringSerializer(),
  235. callType: .bidirectionalStreaming,
  236. interceptors: [],
  237. userHandler: Self.echo(requests:responseStreamWriter:context:)
  238. )
  239. defer {
  240. XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
  241. }
  242. self.loop.execute {
  243. handler.receiveMetadata([:])
  244. handler.receiveMessage(ByteBuffer(string: "hello"))
  245. }
  246. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  247. try await responseStream.next().assertMetadata()
  248. try await responseStream.next().assertStatus { status, _ in
  249. XCTAssertEqual(status.code, .internalError)
  250. }
  251. try await responseStream.next().assertNil()
  252. }
  253. func testReceiveMessageBeforeHeaders() async throws {
  254. let handler = self.makeHandler(
  255. observer: Self.neverCalled(requests:responseStreamWriter:context:)
  256. )
  257. defer {
  258. XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
  259. }
  260. self.loop.execute {
  261. handler.receiveMessage(ByteBuffer(string: "foo"))
  262. }
  263. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  264. try await responseStream.next().assertStatus { status, _ in
  265. XCTAssertEqual(status.code, .internalError)
  266. }
  267. try await responseStream.next().assertNil()
  268. }
  269. func testReceiveMultipleHeaders() async throws {
  270. let handler = self.makeHandler(
  271. observer: Self.neverReceivesMessage(requests:responseStreamWriter:context:)
  272. )
  273. defer {
  274. XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
  275. }
  276. self.loop.execute {
  277. handler.receiveMetadata([:])
  278. handler.receiveMetadata([:])
  279. }
  280. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  281. try await responseStream.next().assertStatus { status, _ in
  282. XCTAssertEqual(status.code, .internalError)
  283. }
  284. try await responseStream.next().assertNil()
  285. }
  286. func testFinishBeforeStarting() async throws {
  287. let handler = self.makeHandler(
  288. observer: Self.neverCalled(requests:responseStreamWriter:context:)
  289. )
  290. self.loop.execute {
  291. handler.finish()
  292. }
  293. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  294. try await responseStream.next().assertStatus()
  295. try await responseStream.next().assertNil()
  296. }
  297. func testFinishAfterHeaders() async throws {
  298. let handler = self.makeHandler(
  299. observer: Self.neverReceivesMessage(requests:responseStreamWriter:context:)
  300. )
  301. self.loop.execute {
  302. handler.receiveMetadata([:])
  303. handler.finish()
  304. }
  305. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  306. try await responseStream.next().assertStatus()
  307. try await responseStream.next().assertNil()
  308. }
  309. func testFinishAfterMessage() async throws {
  310. let handler = self.makeHandler(observer: Self.echo(requests:responseStreamWriter:context:))
  311. self.loop.execute {
  312. handler.receiveMetadata([:])
  313. handler.receiveMessage(ByteBuffer(string: "hello"))
  314. }
  315. // Await the metadata and message so we know the user function is running.
  316. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  317. try await responseStream.next().assertMetadata()
  318. try await responseStream.next().assertMessage()
  319. // Finish, i.e. terminate early.
  320. self.loop.execute {
  321. handler.finish()
  322. }
  323. try await responseStream.next().assertStatus { status, _ in
  324. XCTAssertEqual(status.code, .internalError)
  325. }
  326. try await responseStream.next().assertNil()
  327. }
  328. func testErrorAfterHeaders() async throws {
  329. let handler = self.makeHandler(observer: Self.echo(requests:responseStreamWriter:context:))
  330. self.loop.execute {
  331. handler.receiveMetadata([:])
  332. handler.receiveError(CancellationError())
  333. }
  334. // We don't send a message so we don't expect any responses. As metadata is sent lazily on the
  335. // first message we don't expect to get metadata back either.
  336. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  337. try await responseStream.next().assertStatus { status, _ in
  338. XCTAssertEqual(status.code, .unavailable)
  339. }
  340. try await responseStream.next().assertNil()
  341. }
  342. func testErrorAfterMessage() async throws {
  343. let handler = self.makeHandler(observer: Self.echo(requests:responseStreamWriter:context:))
  344. self.loop.execute {
  345. handler.receiveMetadata([:])
  346. handler.receiveMessage(ByteBuffer(string: "hello"))
  347. }
  348. // Wait the metadata and message; i.e. for function to have been invoked.
  349. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  350. try await responseStream.next().assertMetadata()
  351. try await responseStream.next().assertMessage()
  352. // Throw in an error.
  353. self.loop.execute {
  354. handler.receiveError(CancellationError())
  355. }
  356. // The RPC should end.
  357. try await responseStream.next().assertStatus { status, _ in
  358. XCTAssertEqual(status.code, .unavailable)
  359. }
  360. try await responseStream.next().assertNil()
  361. }
  362. func testHandlerThrowsGRPCStatusOKResultsInUnknownStatus() async throws {
  363. // Create a user function that immediately throws GRPCStatus.ok.
  364. let handler = self.makeHandler { _, _, _ in
  365. throw GRPCStatus.ok
  366. }
  367. // Send some metadata to trigger the creation of the async task with the user function.
  368. self.loop.execute {
  369. handler.receiveMetadata([:])
  370. }
  371. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  372. try await responseStream.next().assertStatus { status, _ in
  373. XCTAssertEqual(status.code, .unknown)
  374. }
  375. try await responseStream.next().assertNil()
  376. }
  377. func testUnaryHandlerReceivingMultipleMessages() async throws {
  378. @Sendable
  379. func neverCalled(_: String, _: GRPCAsyncServerCallContext) async throws -> String {
  380. XCTFail("Should not be called")
  381. return ""
  382. }
  383. let handler = GRPCAsyncServerHandler(
  384. context: self.makeCallHandlerContext(),
  385. requestDeserializer: StringDeserializer(),
  386. responseSerializer: StringSerializer(),
  387. interceptors: [],
  388. wrapping: neverCalled(_:_:)
  389. )
  390. defer {
  391. XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
  392. }
  393. self.loop.execute {
  394. handler.receiveMetadata([:])
  395. handler.receiveMessage(ByteBuffer(string: "1"))
  396. handler.receiveMessage(ByteBuffer(string: "2"))
  397. }
  398. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  399. try await responseStream.next().assertStatus { status, _ in
  400. XCTAssertEqual(status.code, .internalError)
  401. }
  402. }
  403. func testServerStreamingHandlerReceivingMultipleMessages() async throws {
  404. @Sendable
  405. func neverCalled(
  406. _: String,
  407. _: GRPCAsyncResponseStreamWriter<String>,
  408. _: GRPCAsyncServerCallContext
  409. ) async throws {
  410. XCTFail("Should not be called")
  411. }
  412. let handler = GRPCAsyncServerHandler(
  413. context: self.makeCallHandlerContext(),
  414. requestDeserializer: StringDeserializer(),
  415. responseSerializer: StringSerializer(),
  416. interceptors: [],
  417. wrapping: neverCalled(_:_:_:)
  418. )
  419. defer {
  420. XCTAssertNoThrow(try self.loop.submit { handler.finish() }.wait())
  421. }
  422. self.loop.execute {
  423. handler.receiveMetadata([:])
  424. handler.receiveMessage(ByteBuffer(string: "1"))
  425. handler.receiveMessage(ByteBuffer(string: "2"))
  426. }
  427. let responseStream = self.recorder.responseSequence.makeAsyncIterator()
  428. try await responseStream.next().assertStatus { status, _ in
  429. XCTAssertEqual(status.code, .internalError)
  430. }
  431. }
  432. }
  433. internal final class AsyncResponseStream: GRPCServerResponseWriter {
  434. private let source: PassthroughMessageSource<GRPCServerResponsePart<ByteBuffer>, Never>
  435. internal var responseSequence: PassthroughMessageSequence<
  436. GRPCServerResponsePart<ByteBuffer>,
  437. Never
  438. > {
  439. return .init(consuming: self.source)
  440. }
  441. init() {
  442. self.source = PassthroughMessageSource()
  443. }
  444. func sendMetadata(
  445. _ metadata: HPACKHeaders,
  446. flush: Bool,
  447. promise: EventLoopPromise<Void>?
  448. ) {
  449. self.source.yield(.metadata(metadata))
  450. promise?.succeed(())
  451. }
  452. func sendMessage(
  453. _ bytes: ByteBuffer,
  454. metadata: MessageMetadata,
  455. promise: EventLoopPromise<Void>?
  456. ) {
  457. self.source.yield(.message(bytes, metadata))
  458. promise?.succeed(())
  459. }
  460. func sendEnd(
  461. status: GRPCStatus,
  462. trailers: HPACKHeaders,
  463. promise: EventLoopPromise<Void>?
  464. ) {
  465. self.source.yield(.end(status, trailers))
  466. self.source.finish()
  467. promise?.succeed(())
  468. }
  469. func stopRecording() {
  470. self.source.finish()
  471. }
  472. }
  473. extension Optional where Wrapped == GRPCServerResponsePart<ByteBuffer> {
  474. func assertNil() {
  475. XCTAssertNil(self)
  476. }
  477. func assertMetadata(_ verify: (HPACKHeaders) -> Void = { _ in }) {
  478. switch self {
  479. case let .some(.metadata(headers)):
  480. verify(headers)
  481. default:
  482. XCTFail("Expected metadata but value was \(String(describing: self))")
  483. }
  484. }
  485. func assertMessage(_ verify: (ByteBuffer, MessageMetadata) -> Void = { _, _ in }) {
  486. switch self {
  487. case let .some(.message(buffer, metadata)):
  488. verify(buffer, metadata)
  489. default:
  490. XCTFail("Expected message but value was \(String(describing: self))")
  491. }
  492. }
  493. func assertStatus(_ verify: (GRPCStatus, HPACKHeaders) -> Void = { _, _ in }) {
  494. switch self {
  495. case let .some(.end(status, trailers)):
  496. verify(status, trailers)
  497. default:
  498. XCTFail("Expected status but value was \(String(describing: self))")
  499. }
  500. }
  501. }
  502. #endif