UnaryServerHandlerTests.swift 34 KB


  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. import NIOEmbedded
  18. import NIOHPACK
  19. import XCTest
  20. @testable import GRPC
  21. // MARK: - Utils
  22. final class ResponseRecorder: GRPCServerResponseWriter {
  23. var metadata: HPACKHeaders?
  24. var messages: [ByteBuffer] = []
  25. var messageMetadata: [MessageMetadata] = []
  26. var status: GRPCStatus?
  27. var trailers: HPACKHeaders?
  28. func sendMetadata(_ metadata: HPACKHeaders, flush: Bool, promise: EventLoopPromise<Void>?) {
  29. XCTAssertNil(self.metadata)
  30. self.metadata = metadata
  31. promise?.succeed(())
  32. self.recordedMetadataPromise.succeed(())
  33. }
  34. func sendMessage(
  35. _ bytes: ByteBuffer,
  36. metadata: MessageMetadata,
  37. promise: EventLoopPromise<Void>?
  38. ) {
  39. self.messages.append(bytes)
  40. self.messageMetadata.append(metadata)
  41. promise?.succeed(())
  42. self.recordedMessagePromise.succeed(())
  43. }
  44. func sendEnd(status: GRPCStatus, trailers: HPACKHeaders, promise: EventLoopPromise<Void>?) {
  45. XCTAssertNil(self.status)
  46. XCTAssertNil(self.trailers)
  47. self.status = status
  48. self.trailers = trailers
  49. promise?.succeed(())
  50. self.recordedEndPromise.succeed(())
  51. }
  52. var recordedMetadataPromise: EventLoopPromise<Void>
  53. var recordedMessagePromise: EventLoopPromise<Void>
  54. var recordedEndPromise: EventLoopPromise<Void>
  55. init(eventLoop: EventLoop) {
  56. self.recordedMetadataPromise = eventLoop.makePromise()
  57. self.recordedMessagePromise = eventLoop.makePromise()
  58. self.recordedEndPromise = eventLoop.makePromise()
  59. }
  60. deinit {
  61. struct RecordedDidNotIntercept: Error {}
  62. self.recordedMetadataPromise.fail(RecordedDidNotIntercept())
  63. self.recordedMessagePromise.fail(RecordedDidNotIntercept())
  64. self.recordedEndPromise.fail(RecordedDidNotIntercept())
  65. }
  66. }
  67. class ServerHandlerTestCaseBase: GRPCTestCase {
  68. let eventLoop = EmbeddedEventLoop()
  69. let allocator = ByteBufferAllocator()
  70. var recorder: ResponseRecorder!
  71. override func setUp() {
  72. super.setUp()
  73. self.recorder = ResponseRecorder(eventLoop: self.eventLoop)
  74. }
  75. func makeCallHandlerContext(encoding: ServerMessageEncoding = .disabled) -> CallHandlerContext {
  76. return CallHandlerContext(
  77. errorDelegate: nil,
  78. logger: self.logger,
  79. encoding: encoding,
  80. eventLoop: self.eventLoop,
  81. path: "/ignored",
  82. remoteAddress: nil,
  83. responseWriter: self.recorder,
  84. allocator: self.allocator,
  85. closeFuture: self.eventLoop.makeSucceededVoidFuture()
  86. )
  87. }
  88. }
  89. // MARK: - Unary
  90. class UnaryServerHandlerTests: ServerHandlerTestCaseBase {
  91. private func makeHandler(
  92. encoding: ServerMessageEncoding = .disabled,
  93. function: @escaping (String, StatusOnlyCallContext) -> EventLoopFuture<String>
  94. ) -> UnaryServerHandler<StringSerializer, StringDeserializer> {
  95. return UnaryServerHandler(
  96. context: self.makeCallHandlerContext(encoding: encoding),
  97. requestDeserializer: StringDeserializer(),
  98. responseSerializer: StringSerializer(),
  99. interceptors: [],
  100. userFunction: function
  101. )
  102. }
  103. private func echo(_ request: String, context: StatusOnlyCallContext) -> EventLoopFuture<String> {
  104. return context.eventLoop.makeSucceededFuture(request)
  105. }
  106. private func neverComplete(
  107. _ request: String,
  108. context: StatusOnlyCallContext
  109. ) -> EventLoopFuture<String> {
  110. let scheduled = context.eventLoop.scheduleTask(deadline: .distantFuture) {
  111. return request
  112. }
  113. return scheduled.futureResult
  114. }
  115. private func neverCalled(
  116. _ request: String,
  117. context: StatusOnlyCallContext
  118. ) -> EventLoopFuture<String> {
  119. XCTFail("Unexpected function invocation")
  120. return context.eventLoop.makeFailedFuture(GRPCError.InvalidState(""))
  121. }
  122. func testHappyPath() {
  123. let handler = self.makeHandler(function: self.echo(_:context:))
  124. handler.receiveMetadata([:])
  125. assertThat(self.recorder.metadata, .is([:]))
  126. let buffer = ByteBuffer(string: "hello")
  127. handler.receiveMessage(buffer)
  128. handler.receiveEnd()
  129. handler.finish()
  130. assertThat(self.recorder.messages.first, .is(buffer))
  131. assertThat(self.recorder.messageMetadata.first?.compress, .is(false))
  132. assertThat(self.recorder.status, .some(.hasCode(.ok)))
  133. assertThat(self.recorder.trailers, .is([:]))
  134. }
  135. func testHappyPathWithCompressionEnabled() {
  136. let handler = self.makeHandler(
  137. encoding: .enabled(.init(decompressionLimit: .absolute(.max))),
  138. function: self.echo(_:context:)
  139. )
  140. handler.receiveMetadata([:])
  141. let buffer = ByteBuffer(string: "hello")
  142. handler.receiveMessage(buffer)
  143. assertThat(self.recorder.messages.first, .is(buffer))
  144. assertThat(self.recorder.messageMetadata.first?.compress, .is(true))
  145. }
  146. func testHappyPathWithCompressionEnabledButDisabledByCaller() {
  147. let handler = self.makeHandler(
  148. encoding: .enabled(.init(decompressionLimit: .absolute(.max)))
  149. ) { request, context in
  150. context.compressionEnabled = false
  151. return self.echo(request, context: context)
  152. }
  153. handler.receiveMetadata([:])
  154. let buffer = ByteBuffer(string: "hello")
  155. handler.receiveMessage(buffer)
  156. assertThat(self.recorder.messages.first, .is(buffer))
  157. assertThat(self.recorder.messageMetadata.first?.compress, .is(false))
  158. }
  159. func testThrowingDeserializer() {
  160. let handler = UnaryServerHandler(
  161. context: self.makeCallHandlerContext(),
  162. requestDeserializer: ThrowingStringDeserializer(),
  163. responseSerializer: StringSerializer(),
  164. interceptors: [],
  165. userFunction: self.neverCalled(_:context:)
  166. )
  167. handler.receiveMetadata([:])
  168. assertThat(self.recorder.metadata, .is([:]))
  169. let buffer = ByteBuffer(string: "hello")
  170. handler.receiveMessage(buffer)
  171. assertThat(self.recorder.messages, .isEmpty())
  172. assertThat(self.recorder.status, .some(.hasCode(.internalError)))
  173. }
  174. func testThrowingSerializer() {
  175. let handler = UnaryServerHandler(
  176. context: self.makeCallHandlerContext(),
  177. requestDeserializer: StringDeserializer(),
  178. responseSerializer: ThrowingStringSerializer(),
  179. interceptors: [],
  180. userFunction: self.echo(_:context:)
  181. )
  182. handler.receiveMetadata([:])
  183. assertThat(self.recorder.metadata, .is([:]))
  184. let buffer = ByteBuffer(string: "hello")
  185. handler.receiveMessage(buffer)
  186. handler.receiveEnd()
  187. assertThat(self.recorder.messages, .isEmpty())
  188. assertThat(self.recorder.status, .some(.hasCode(.internalError)))
  189. }
  190. func testUserFunctionReturnsFailedFuture() {
  191. let handler = self.makeHandler { _, context in
  192. return context.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: ":("))
  193. }
  194. handler.receiveMetadata([:])
  195. assertThat(self.recorder.metadata, .is([:]))
  196. let buffer = ByteBuffer(string: "hello")
  197. handler.receiveMessage(buffer)
  198. assertThat(self.recorder.messages, .isEmpty())
  199. assertThat(self.recorder.status, .some(.hasCode(.unavailable)))
  200. assertThat(self.recorder.status?.message, .is(":("))
  201. }
  202. func testReceiveMessageBeforeHeaders() {
  203. let handler = self.makeHandler(function: self.neverCalled(_:context:))
  204. handler.receiveMessage(ByteBuffer(string: "foo"))
  205. assertThat(self.recorder.metadata, .is(.none()))
  206. assertThat(self.recorder.messages, .isEmpty())
  207. assertThat(self.recorder.status, .some(.hasCode(.internalError)))
  208. }
  209. func testReceiveMultipleHeaders() {
  210. let handler = self.makeHandler(function: self.neverCalled(_:context:))
  211. handler.receiveMetadata([:])
  212. assertThat(self.recorder.metadata, .is([:]))
  213. handler.receiveMetadata([:])
  214. assertThat(self.recorder.messages, .isEmpty())
  215. assertThat(self.recorder.status, .some(.hasCode(.internalError)))
  216. }
  217. func testReceiveMultipleMessages() {
  218. let handler = self.makeHandler(function: self.neverComplete(_:context:))
  219. handler.receiveMetadata([:])
  220. assertThat(self.recorder.metadata, .is([:]))
  221. let buffer = ByteBuffer(string: "hello")
  222. handler.receiveMessage(buffer)
  223. handler.receiveEnd()
  224. // Send another message before the function completes.
  225. handler.receiveMessage(buffer)
  226. assertThat(self.recorder.messages, .isEmpty())
  227. assertThat(self.recorder.status, .some(.hasCode(.internalError)))
  228. }
  229. func testFinishBeforeStarting() {
  230. let handler = self.makeHandler(function: self.neverCalled(_:context:))
  231. handler.finish()
  232. assertThat(self.recorder.metadata, .is(.none()))
  233. assertThat(self.recorder.messages, .isEmpty())
  234. assertThat(self.recorder.status, .is(.none()))
  235. assertThat(self.recorder.trailers, .is(.none()))
  236. }
  237. func testFinishAfterHeaders() {
  238. let handler = self.makeHandler(function: self.neverCalled(_:context:))
  239. handler.receiveMetadata([:])
  240. assertThat(self.recorder.metadata, .is([:]))
  241. handler.finish()
  242. assertThat(self.recorder.messages, .isEmpty())
  243. assertThat(self.recorder.status, .some(.hasCode(.unavailable)))
  244. assertThat(self.recorder.trailers, .is([:]))
  245. }
  246. func testFinishAfterMessage() {
  247. let handler = self.makeHandler(function: self.neverComplete(_:context:))
  248. handler.receiveMetadata([:])
  249. handler.receiveMessage(ByteBuffer(string: "hello"))
  250. handler.finish()
  251. assertThat(self.recorder.messages, .isEmpty())
  252. assertThat(self.recorder.status, .some(.hasCode(.unavailable)))
  253. assertThat(self.recorder.trailers, .is([:]))
  254. }
  255. }
  256. // MARK: - Client Streaming
  257. class ClientStreamingServerHandlerTests: ServerHandlerTestCaseBase {
  258. private func makeHandler(
  259. encoding: ServerMessageEncoding = .disabled,
  260. observerFactory: @escaping (UnaryResponseCallContext<String>)
  261. -> EventLoopFuture<(StreamEvent<String>) -> Void>
  262. ) -> ClientStreamingServerHandler<StringSerializer, StringDeserializer> {
  263. return ClientStreamingServerHandler(
  264. context: self.makeCallHandlerContext(encoding: encoding),
  265. requestDeserializer: StringDeserializer(),
  266. responseSerializer: StringSerializer(),
  267. interceptors: [],
  268. observerFactory: observerFactory
  269. )
  270. }
  271. private func joinWithSpaces(
  272. context: UnaryResponseCallContext<String>
  273. ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
  274. var messages: [String] = []
  275. func onEvent(_ event: StreamEvent<String>) {
  276. switch event {
  277. case let .message(message):
  278. messages.append(message)
  279. case .end:
  280. context.responsePromise.succeed(messages.joined(separator: " "))
  281. }
  282. }
  283. return context.eventLoop.makeSucceededFuture(onEvent(_:))
  284. }
  285. private func neverReceivesMessage(
  286. context: UnaryResponseCallContext<String>
  287. ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
  288. func onEvent(_ event: StreamEvent<String>) {
  289. switch event {
  290. case let .message(message):
  291. XCTFail("Unexpected message: '\(message)'")
  292. case .end:
  293. context.responsePromise.succeed("")
  294. }
  295. }
  296. return context.eventLoop.makeSucceededFuture(onEvent(_:))
  297. }
  298. private func neverCalled(
  299. context: UnaryResponseCallContext<String>
  300. ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
  301. XCTFail("This observer factory should never be called")
  302. return context.eventLoop.makeFailedFuture(GRPCStatus(code: .aborted, message: nil))
  303. }
  304. func testHappyPath() {
  305. let handler = self.makeHandler(observerFactory: self.joinWithSpaces(context:))
  306. handler.receiveMetadata([:])
  307. assertThat(self.recorder.metadata, .is([:]))
  308. handler.receiveMessage(ByteBuffer(string: "1"))
  309. handler.receiveMessage(ByteBuffer(string: "2"))
  310. handler.receiveMessage(ByteBuffer(string: "3"))
  311. handler.receiveEnd()
  312. handler.finish()
  313. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "1 2 3")))
  314. assertThat(self.recorder.messageMetadata.first?.compress, .is(false))
  315. assertThat(self.recorder.status, .some(.hasCode(.ok)))
  316. assertThat(self.recorder.trailers, .is([:]))
  317. }
  318. func testHappyPathWithCompressionEnabled() {
  319. let handler = self.makeHandler(
  320. encoding: .enabled(.init(decompressionLimit: .absolute(.max))),
  321. observerFactory: self.joinWithSpaces(context:)
  322. )
  323. handler.receiveMetadata([:])
  324. handler.receiveMessage(ByteBuffer(string: "1"))
  325. handler.receiveMessage(ByteBuffer(string: "2"))
  326. handler.receiveMessage(ByteBuffer(string: "3"))
  327. handler.receiveEnd()
  328. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "1 2 3")))
  329. assertThat(self.recorder.messageMetadata.first?.compress, .is(true))
  330. }
  331. func testHappyPathWithCompressionEnabledButDisabledByCaller() {
  332. let handler = self.makeHandler(
  333. encoding: .enabled(.init(decompressionLimit: .absolute(.max)))
  334. ) { context in
  335. context.compressionEnabled = false
  336. return self.joinWithSpaces(context: context)
  337. }
  338. handler.receiveMetadata([:])
  339. handler.receiveMessage(ByteBuffer(string: "1"))
  340. handler.receiveMessage(ByteBuffer(string: "2"))
  341. handler.receiveMessage(ByteBuffer(string: "3"))
  342. handler.receiveEnd()
  343. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "1 2 3")))
  344. assertThat(self.recorder.messageMetadata.first?.compress, .is(false))
  345. }
  346. func testThrowingDeserializer() {
  347. let handler = ClientStreamingServerHandler(
  348. context: self.makeCallHandlerContext(),
  349. requestDeserializer: ThrowingStringDeserializer(),
  350. responseSerializer: StringSerializer(),
  351. interceptors: [],
  352. observerFactory: self.neverReceivesMessage(context:)
  353. )
  354. handler.receiveMetadata([:])
  355. assertThat(self.recorder.metadata, .is([:]))
  356. let buffer = ByteBuffer(string: "hello")
  357. handler.receiveMessage(buffer)
  358. assertThat(self.recorder.messages, .isEmpty())
  359. assertThat(self.recorder.status, .some(.hasCode(.internalError)))
  360. }
  361. func testThrowingSerializer() {
  362. let handler = ClientStreamingServerHandler(
  363. context: self.makeCallHandlerContext(),
  364. requestDeserializer: StringDeserializer(),
  365. responseSerializer: ThrowingStringSerializer(),
  366. interceptors: [],
  367. observerFactory: self.joinWithSpaces(context:)
  368. )
  369. handler.receiveMetadata([:])
  370. assertThat(self.recorder.metadata, .is([:]))
  371. let buffer = ByteBuffer(string: "hello")
  372. handler.receiveMessage(buffer)
  373. handler.receiveEnd()
  374. assertThat(self.recorder.messages, .isEmpty())
  375. assertThat(self.recorder.status, .some(.hasCode(.internalError)))
  376. }
  377. func testObserverFactoryReturnsFailedFuture() {
  378. let handler = self.makeHandler { context in
  379. context.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: ":("))
  380. }
  381. handler.receiveMetadata([:])
  382. assertThat(self.recorder.messages, .isEmpty())
  383. assertThat(self.recorder.status, .some(.hasCode(.unavailable)))
  384. assertThat(self.recorder.status?.message, .is(":("))
  385. }
  386. func testDelayedObserverFactory() {
  387. let promise = self.eventLoop.makePromise(of: Void.self)
  388. let handler = self.makeHandler { context in
  389. return promise.futureResult.flatMap {
  390. self.joinWithSpaces(context: context)
  391. }
  392. }
  393. handler.receiveMetadata([:])
  394. // Queue up some messages.
  395. handler.receiveMessage(ByteBuffer(string: "1"))
  396. handler.receiveMessage(ByteBuffer(string: "2"))
  397. handler.receiveMessage(ByteBuffer(string: "3"))
  398. // Succeed the observer block.
  399. promise.succeed(())
  400. // A few more messages.
  401. handler.receiveMessage(ByteBuffer(string: "4"))
  402. handler.receiveMessage(ByteBuffer(string: "5"))
  403. handler.receiveEnd()
  404. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "1 2 3 4 5")))
  405. assertThat(self.recorder.status, .some(.hasCode(.ok)))
  406. }
  407. func testDelayedObserverFactoryAllMessagesBeforeSucceeding() {
  408. let promise = self.eventLoop.makePromise(of: Void.self)
  409. let handler = self.makeHandler { context in
  410. return promise.futureResult.flatMap {
  411. self.joinWithSpaces(context: context)
  412. }
  413. }
  414. handler.receiveMetadata([:])
  415. // Queue up some messages.
  416. handler.receiveMessage(ByteBuffer(string: "1"))
  417. handler.receiveMessage(ByteBuffer(string: "2"))
  418. handler.receiveMessage(ByteBuffer(string: "3"))
  419. handler.receiveEnd()
  420. // Succeed the observer block.
  421. promise.succeed(())
  422. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "1 2 3")))
  423. assertThat(self.recorder.status, .some(.hasCode(.ok)))
  424. }
  425. func testReceiveMessageBeforeHeaders() {
  426. let handler = self.makeHandler(observerFactory: self.neverCalled(context:))
  427. handler.receiveMessage(ByteBuffer(string: "foo"))
  428. assertThat(self.recorder.metadata, .is(.none()))
  429. assertThat(self.recorder.messages, .isEmpty())
  430. assertThat(self.recorder.status, .some(.hasCode(.internalError)))
  431. }
  432. func testReceiveMultipleHeaders() {
  433. let handler = self.makeHandler(observerFactory: self.neverReceivesMessage(context:))
  434. handler.receiveMetadata([:])
  435. assertThat(self.recorder.metadata, .is([:]))
  436. handler.receiveMetadata([:])
  437. assertThat(self.recorder.messages, .isEmpty())
  438. assertThat(self.recorder.status, .some(.hasCode(.internalError)))
  439. }
  440. func testFinishBeforeStarting() {
  441. let handler = self.makeHandler(observerFactory: self.neverCalled(context:))
  442. handler.finish()
  443. assertThat(self.recorder.metadata, .is(.none()))
  444. assertThat(self.recorder.messages, .isEmpty())
  445. assertThat(self.recorder.status, .is(.none()))
  446. assertThat(self.recorder.trailers, .is(.none()))
  447. }
  448. func testFinishAfterHeaders() {
  449. let handler = self.makeHandler(observerFactory: self.joinWithSpaces(context:))
  450. handler.receiveMetadata([:])
  451. assertThat(self.recorder.metadata, .is([:]))
  452. handler.finish()
  453. assertThat(self.recorder.messages, .isEmpty())
  454. assertThat(self.recorder.status, .some(.hasCode(.unavailable)))
  455. assertThat(self.recorder.trailers, .is([:]))
  456. }
  457. func testFinishAfterMessage() {
  458. let handler = self.makeHandler(observerFactory: self.joinWithSpaces(context:))
  459. handler.receiveMetadata([:])
  460. handler.receiveMessage(ByteBuffer(string: "hello"))
  461. handler.finish()
  462. assertThat(self.recorder.messages, .isEmpty())
  463. assertThat(self.recorder.status, .some(.hasCode(.unavailable)))
  464. assertThat(self.recorder.trailers, .is([:]))
  465. }
  466. }
  467. class ServerStreamingServerHandlerTests: ServerHandlerTestCaseBase {
  468. private func makeHandler(
  469. encoding: ServerMessageEncoding = .disabled,
  470. userFunction: @escaping (String, StreamingResponseCallContext<String>)
  471. -> EventLoopFuture<GRPCStatus>
  472. ) -> ServerStreamingServerHandler<StringSerializer, StringDeserializer> {
  473. return ServerStreamingServerHandler(
  474. context: self.makeCallHandlerContext(encoding: encoding),
  475. requestDeserializer: StringDeserializer(),
  476. responseSerializer: StringSerializer(),
  477. interceptors: [],
  478. userFunction: userFunction
  479. )
  480. }
  481. private func breakOnSpaces(
  482. _ request: String,
  483. context: StreamingResponseCallContext<String>
  484. ) -> EventLoopFuture<GRPCStatus> {
  485. let parts = request.components(separatedBy: " ")
  486. context.sendResponses(parts, promise: nil)
  487. return context.eventLoop.makeSucceededFuture(.ok)
  488. }
  489. private func neverCalled(
  490. _ request: String,
  491. context: StreamingResponseCallContext<String>
  492. ) -> EventLoopFuture<GRPCStatus> {
  493. XCTFail("Unexpected invocation")
  494. return context.eventLoop.makeSucceededFuture(.processingError)
  495. }
  496. private func neverComplete(
  497. _ request: String,
  498. context: StreamingResponseCallContext<String>
  499. ) -> EventLoopFuture<GRPCStatus> {
  500. return context.eventLoop.scheduleTask(deadline: .distantFuture) {
  501. return .processingError
  502. }.futureResult
  503. }
  504. func testHappyPath() {
  505. let handler = self.makeHandler(userFunction: self.breakOnSpaces(_:context:))
  506. handler.receiveMetadata([:])
  507. assertThat(self.recorder.metadata, .is([:]))
  508. handler.receiveMessage(ByteBuffer(string: "a b"))
  509. handler.receiveEnd()
  510. handler.finish()
  511. assertThat(
  512. self.recorder.messages,
  513. .is([ByteBuffer(string: "a"), ByteBuffer(string: "b")])
  514. )
  515. assertThat(self.recorder.messageMetadata.map { $0.compress }, .is([false, false]))
  516. assertThat(self.recorder.status, .some(.hasCode(.ok)))
  517. assertThat(self.recorder.trailers, .is([:]))
  518. }
  519. func testHappyPathWithCompressionEnabled() {
  520. let handler = self.makeHandler(
  521. encoding: .enabled(.init(decompressionLimit: .absolute(.max))),
  522. userFunction: self.breakOnSpaces(_:context:)
  523. )
  524. handler.receiveMetadata([:])
  525. handler.receiveMessage(ByteBuffer(string: "a"))
  526. handler.receiveEnd()
  527. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "a")))
  528. assertThat(self.recorder.messageMetadata.first?.compress, .is(true))
  529. }
  530. func testHappyPathWithCompressionEnabledButDisabledByCaller() {
  531. let handler = self.makeHandler(
  532. encoding: .enabled(.init(decompressionLimit: .absolute(.max)))
  533. ) { request, context in
  534. context.compressionEnabled = false
  535. return self.breakOnSpaces(request, context: context)
  536. }
  537. handler.receiveMetadata([:])
  538. handler.receiveMessage(ByteBuffer(string: "a"))
  539. handler.receiveEnd()
  540. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "a")))
  541. assertThat(self.recorder.messageMetadata.first?.compress, .is(false))
  542. }
  543. func testThrowingDeserializer() {
  544. let handler = ServerStreamingServerHandler(
  545. context: self.makeCallHandlerContext(),
  546. requestDeserializer: ThrowingStringDeserializer(),
  547. responseSerializer: StringSerializer(),
  548. interceptors: [],
  549. userFunction: self.neverCalled(_:context:)
  550. )
  551. handler.receiveMetadata([:])
  552. assertThat(self.recorder.metadata, .is([:]))
  553. let buffer = ByteBuffer(string: "hello")
  554. handler.receiveMessage(buffer)
  555. assertThat(self.recorder.messages, .isEmpty())
  556. assertThat(self.recorder.status, .some(.hasCode(.internalError)))
  557. }
  558. func testThrowingSerializer() {
  559. let handler = ServerStreamingServerHandler(
  560. context: self.makeCallHandlerContext(),
  561. requestDeserializer: StringDeserializer(),
  562. responseSerializer: ThrowingStringSerializer(),
  563. interceptors: [],
  564. userFunction: self.breakOnSpaces(_:context:)
  565. )
  566. handler.receiveMetadata([:])
  567. assertThat(self.recorder.metadata, .is([:]))
  568. let buffer = ByteBuffer(string: "1 2 3")
  569. handler.receiveMessage(buffer)
  570. handler.receiveEnd()
  571. assertThat(self.recorder.messages, .isEmpty())
  572. assertThat(self.recorder.status, .some(.hasCode(.internalError)))
  573. }
  574. func testUserFunctionReturnsFailedFuture() {
  575. let handler = self.makeHandler { _, context in
  576. return context.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: ":("))
  577. }
  578. handler.receiveMetadata([:])
  579. assertThat(self.recorder.metadata, .is([:]))
  580. let buffer = ByteBuffer(string: "hello")
  581. handler.receiveMessage(buffer)
  582. assertThat(self.recorder.messages, .isEmpty())
  583. assertThat(self.recorder.status, .some(.hasCode(.unavailable)))
  584. assertThat(self.recorder.status?.message, .is(":("))
  585. }
  586. func testReceiveMessageBeforeHeaders() {
  587. let handler = self.makeHandler(userFunction: self.neverCalled(_:context:))
  588. handler.receiveMessage(ByteBuffer(string: "foo"))
  589. assertThat(self.recorder.metadata, .is(.none()))
  590. assertThat(self.recorder.messages, .isEmpty())
  591. assertThat(self.recorder.status, .some(.hasCode(.internalError)))
  592. }
  593. func testReceiveMultipleHeaders() {
  594. let handler = self.makeHandler(userFunction: self.neverCalled(_:context:))
  595. handler.receiveMetadata([:])
  596. assertThat(self.recorder.metadata, .is([:]))
  597. handler.receiveMetadata([:])
  598. assertThat(self.recorder.messages, .isEmpty())
  599. assertThat(self.recorder.status, .some(.hasCode(.internalError)))
  600. }
  601. func testReceiveMultipleMessages() {
  602. let handler = self.makeHandler(userFunction: self.neverComplete(_:context:))
  603. handler.receiveMetadata([:])
  604. assertThat(self.recorder.metadata, .is([:]))
  605. let buffer = ByteBuffer(string: "hello")
  606. handler.receiveMessage(buffer)
  607. handler.receiveEnd()
  608. // Send another message before the function completes.
  609. handler.receiveMessage(buffer)
  610. assertThat(self.recorder.messages, .isEmpty())
  611. assertThat(self.recorder.status, .some(.hasCode(.internalError)))
  612. }
  613. func testFinishBeforeStarting() {
  614. let handler = self.makeHandler(userFunction: self.neverCalled(_:context:))
  615. handler.finish()
  616. assertThat(self.recorder.metadata, .is(.none()))
  617. assertThat(self.recorder.messages, .isEmpty())
  618. assertThat(self.recorder.status, .is(.none()))
  619. assertThat(self.recorder.trailers, .is(.none()))
  620. }
  621. func testFinishAfterHeaders() {
  622. let handler = self.makeHandler(userFunction: self.neverCalled(_:context:))
  623. handler.receiveMetadata([:])
  624. assertThat(self.recorder.metadata, .is([:]))
  625. handler.finish()
  626. assertThat(self.recorder.messages, .isEmpty())
  627. assertThat(self.recorder.status, .some(.hasCode(.unavailable)))
  628. assertThat(self.recorder.trailers, .is([:]))
  629. }
  630. func testFinishAfterMessage() {
  631. let handler = self.makeHandler(userFunction: self.neverComplete(_:context:))
  632. handler.receiveMetadata([:])
  633. handler.receiveMessage(ByteBuffer(string: "hello"))
  634. handler.finish()
  635. assertThat(self.recorder.messages, .isEmpty())
  636. assertThat(self.recorder.status, .some(.hasCode(.unavailable)))
  637. assertThat(self.recorder.trailers, .is([:]))
  638. }
  639. }
  640. // MARK: - Bidirectional Streaming
  641. class BidirectionalStreamingServerHandlerTests: ServerHandlerTestCaseBase {
  642. private func makeHandler(
  643. encoding: ServerMessageEncoding = .disabled,
  644. observerFactory: @escaping (StreamingResponseCallContext<String>)
  645. -> EventLoopFuture<(StreamEvent<String>) -> Void>
  646. ) -> BidirectionalStreamingServerHandler<StringSerializer, StringDeserializer> {
  647. return BidirectionalStreamingServerHandler(
  648. context: self.makeCallHandlerContext(encoding: encoding),
  649. requestDeserializer: StringDeserializer(),
  650. responseSerializer: StringSerializer(),
  651. interceptors: [],
  652. observerFactory: observerFactory
  653. )
  654. }
  655. private func echo(
  656. context: StreamingResponseCallContext<String>
  657. ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
  658. func onEvent(_ event: StreamEvent<String>) {
  659. switch event {
  660. case let .message(message):
  661. context.sendResponse(message, promise: nil)
  662. case .end:
  663. context.statusPromise.succeed(.ok)
  664. }
  665. }
  666. return context.eventLoop.makeSucceededFuture(onEvent(_:))
  667. }
  668. private func neverReceivesMessage(
  669. context: StreamingResponseCallContext<String>
  670. ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
  671. func onEvent(_ event: StreamEvent<String>) {
  672. switch event {
  673. case let .message(message):
  674. XCTFail("Unexpected message: '\(message)'")
  675. case .end:
  676. context.statusPromise.succeed(.ok)
  677. }
  678. }
  679. return context.eventLoop.makeSucceededFuture(onEvent(_:))
  680. }
  681. private func neverCalled(
  682. context: StreamingResponseCallContext<String>
  683. ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
  684. XCTFail("This observer factory should never be called")
  685. return context.eventLoop.makeFailedFuture(GRPCStatus(code: .aborted, message: nil))
  686. }
  687. func testHappyPath() {
  688. let handler = self.makeHandler(observerFactory: self.echo(context:))
  689. handler.receiveMetadata([:])
  690. assertThat(self.recorder.metadata, .is([:]))
  691. handler.receiveMessage(ByteBuffer(string: "1"))
  692. handler.receiveMessage(ByteBuffer(string: "2"))
  693. handler.receiveMessage(ByteBuffer(string: "3"))
  694. handler.receiveEnd()
  695. handler.finish()
  696. assertThat(
  697. self.recorder.messages,
  698. .is([ByteBuffer(string: "1"), ByteBuffer(string: "2"), ByteBuffer(string: "3")])
  699. )
  700. assertThat(self.recorder.messageMetadata.map { $0.compress }, .is([false, false, false]))
  701. assertThat(self.recorder.status, .some(.hasCode(.ok)))
  702. assertThat(self.recorder.trailers, .is([:]))
  703. }
  704. func testHappyPathWithCompressionEnabled() {
  705. let handler = self.makeHandler(
  706. encoding: .enabled(.init(decompressionLimit: .absolute(.max))),
  707. observerFactory: self.echo(context:)
  708. )
  709. handler.receiveMetadata([:])
  710. handler.receiveMessage(ByteBuffer(string: "1"))
  711. handler.receiveMessage(ByteBuffer(string: "2"))
  712. handler.receiveMessage(ByteBuffer(string: "3"))
  713. handler.receiveEnd()
  714. assertThat(
  715. self.recorder.messages,
  716. .is([ByteBuffer(string: "1"), ByteBuffer(string: "2"), ByteBuffer(string: "3")])
  717. )
  718. assertThat(self.recorder.messageMetadata.map { $0.compress }, .is([true, true, true]))
  719. }
  720. func testHappyPathWithCompressionEnabledButDisabledByCaller() {
  721. let handler = self.makeHandler(
  722. encoding: .enabled(.init(decompressionLimit: .absolute(.max)))
  723. ) { context in
  724. context.compressionEnabled = false
  725. return self.echo(context: context)
  726. }
  727. handler.receiveMetadata([:])
  728. handler.receiveMessage(ByteBuffer(string: "1"))
  729. handler.receiveMessage(ByteBuffer(string: "2"))
  730. handler.receiveMessage(ByteBuffer(string: "3"))
  731. handler.receiveEnd()
  732. assertThat(
  733. self.recorder.messages,
  734. .is([ByteBuffer(string: "1"), ByteBuffer(string: "2"), ByteBuffer(string: "3")])
  735. )
  736. assertThat(self.recorder.messageMetadata.map { $0.compress }, .is([false, false, false]))
  737. }
  738. func testThrowingDeserializer() {
  739. let handler = BidirectionalStreamingServerHandler(
  740. context: self.makeCallHandlerContext(),
  741. requestDeserializer: ThrowingStringDeserializer(),
  742. responseSerializer: StringSerializer(),
  743. interceptors: [],
  744. observerFactory: self.neverReceivesMessage(context:)
  745. )
  746. handler.receiveMetadata([:])
  747. assertThat(self.recorder.metadata, .is([:]))
  748. let buffer = ByteBuffer(string: "hello")
  749. handler.receiveMessage(buffer)
  750. assertThat(self.recorder.messages, .isEmpty())
  751. assertThat(self.recorder.status, .some(.hasCode(.internalError)))
  752. }
  753. func testThrowingSerializer() {
  754. let handler = BidirectionalStreamingServerHandler(
  755. context: self.makeCallHandlerContext(),
  756. requestDeserializer: StringDeserializer(),
  757. responseSerializer: ThrowingStringSerializer(),
  758. interceptors: [],
  759. observerFactory: self.echo(context:)
  760. )
  761. handler.receiveMetadata([:])
  762. assertThat(self.recorder.metadata, .is([:]))
  763. let buffer = ByteBuffer(string: "hello")
  764. handler.receiveMessage(buffer)
  765. handler.receiveEnd()
  766. assertThat(self.recorder.messages, .isEmpty())
  767. assertThat(self.recorder.status, .some(.hasCode(.internalError)))
  768. }
  769. func testObserverFactoryReturnsFailedFuture() {
  770. let handler = self.makeHandler { context in
  771. context.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: ":("))
  772. }
  773. handler.receiveMetadata([:])
  774. assertThat(self.recorder.messages, .isEmpty())
  775. assertThat(self.recorder.status, .some(.hasCode(.unavailable)))
  776. assertThat(self.recorder.status?.message, .is(":("))
  777. }
  778. func testDelayedObserverFactory() {
  779. let promise = self.eventLoop.makePromise(of: Void.self)
  780. let handler = self.makeHandler { context in
  781. return promise.futureResult.flatMap {
  782. self.echo(context: context)
  783. }
  784. }
  785. handler.receiveMetadata([:])
  786. // Queue up some messages.
  787. handler.receiveMessage(ByteBuffer(string: "1"))
  788. // Succeed the observer block.
  789. promise.succeed(())
  790. // A few more messages.
  791. handler.receiveMessage(ByteBuffer(string: "2"))
  792. handler.receiveEnd()
  793. assertThat(
  794. self.recorder.messages,
  795. .is([ByteBuffer(string: "1"), ByteBuffer(string: "2")])
  796. )
  797. assertThat(self.recorder.status, .some(.hasCode(.ok)))
  798. }
  799. func testDelayedObserverFactoryAllMessagesBeforeSucceeding() {
  800. let promise = self.eventLoop.makePromise(of: Void.self)
  801. let handler = self.makeHandler { context in
  802. return promise.futureResult.flatMap {
  803. self.echo(context: context)
  804. }
  805. }
  806. handler.receiveMetadata([:])
  807. // Queue up some messages.
  808. handler.receiveMessage(ByteBuffer(string: "1"))
  809. handler.receiveMessage(ByteBuffer(string: "2"))
  810. handler.receiveEnd()
  811. // Succeed the observer block.
  812. promise.succeed(())
  813. assertThat(
  814. self.recorder.messages,
  815. .is([ByteBuffer(string: "1"), ByteBuffer(string: "2")])
  816. )
  817. assertThat(self.recorder.status, .some(.hasCode(.ok)))
  818. }
  819. func testReceiveMessageBeforeHeaders() {
  820. let handler = self.makeHandler(observerFactory: self.neverCalled(context:))
  821. handler.receiveMessage(ByteBuffer(string: "foo"))
  822. assertThat(self.recorder.metadata, .is(.none()))
  823. assertThat(self.recorder.messages, .isEmpty())
  824. assertThat(self.recorder.status, .some(.hasCode(.internalError)))
  825. }
  826. func testReceiveMultipleHeaders() {
  827. let handler = self.makeHandler(observerFactory: self.neverReceivesMessage(context:))
  828. handler.receiveMetadata([:])
  829. assertThat(self.recorder.metadata, .is([:]))
  830. handler.receiveMetadata([:])
  831. assertThat(self.recorder.messages, .isEmpty())
  832. assertThat(self.recorder.status, .some(.hasCode(.internalError)))
  833. }
  834. func testFinishBeforeStarting() {
  835. let handler = self.makeHandler(observerFactory: self.neverCalled(context:))
  836. handler.finish()
  837. assertThat(self.recorder.metadata, .is(.none()))
  838. assertThat(self.recorder.messages, .isEmpty())
  839. assertThat(self.recorder.status, .is(.none()))
  840. assertThat(self.recorder.trailers, .is(.none()))
  841. }
  842. func testFinishAfterHeaders() {
  843. let handler = self.makeHandler(observerFactory: self.echo(context:))
  844. handler.receiveMetadata([:])
  845. assertThat(self.recorder.metadata, .is([:]))
  846. handler.finish()
  847. assertThat(self.recorder.messages, .isEmpty())
  848. assertThat(self.recorder.status, .some(.hasCode(.unavailable)))
  849. assertThat(self.recorder.trailers, .is([:]))
  850. }
  851. func testFinishAfterMessage() {
  852. let handler = self.makeHandler(observerFactory: self.echo(context:))
  853. handler.receiveMetadata([:])
  854. handler.receiveMessage(ByteBuffer(string: "hello"))
  855. handler.finish()
  856. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "hello")))
  857. assertThat(self.recorder.status, .some(.hasCode(.unavailable)))
  858. assertThat(self.recorder.trailers, .is([:]))
  859. }
  860. }