UnaryServerHandlerTests.swift 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. @testable import GRPC
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import XCTest
  21. // MARK: - Utils
  22. final class ResponseRecorder: GRPCServerResponseWriter {
  23. var metadata: HPACKHeaders?
  24. var messages: [ByteBuffer] = []
  25. var messageMetadata: [MessageMetadata] = []
  26. var status: GRPCStatus?
  27. var trailers: HPACKHeaders?
  28. func sendMetadata(_ metadata: HPACKHeaders, flush: Bool, promise: EventLoopPromise<Void>?) {
  29. XCTAssertNil(self.metadata)
  30. self.metadata = metadata
  31. promise?.succeed(())
  32. }
  33. func sendMessage(
  34. _ bytes: ByteBuffer,
  35. metadata: MessageMetadata,
  36. promise: EventLoopPromise<Void>?
  37. ) {
  38. self.messages.append(bytes)
  39. self.messageMetadata.append(metadata)
  40. promise?.succeed(())
  41. }
  42. func sendEnd(status: GRPCStatus, trailers: HPACKHeaders, promise: EventLoopPromise<Void>?) {
  43. XCTAssertNil(self.status)
  44. XCTAssertNil(self.trailers)
  45. self.status = status
  46. self.trailers = trailers
  47. promise?.succeed(())
  48. }
  49. }
  50. protocol ServerHandlerTestCase: GRPCTestCase {
  51. var eventLoop: EmbeddedEventLoop { get }
  52. var allocator: ByteBufferAllocator { get }
  53. var recorder: ResponseRecorder { get }
  54. }
  55. extension ServerHandlerTestCase {
  56. func makeCallHandlerContext(encoding: ServerMessageEncoding = .disabled) -> CallHandlerContext {
  57. return CallHandlerContext(
  58. errorDelegate: nil,
  59. logger: self.logger,
  60. encoding: encoding,
  61. eventLoop: self.eventLoop,
  62. path: "/ignored",
  63. remoteAddress: nil,
  64. responseWriter: self.recorder,
  65. allocator: self.allocator,
  66. closeFuture: self.eventLoop.makeSucceededVoidFuture()
  67. )
  68. }
  69. }
  70. // MARK: - Unary
  71. class UnaryServerHandlerTests: GRPCTestCase, ServerHandlerTestCase {
  72. let eventLoop = EmbeddedEventLoop()
  73. let allocator = ByteBufferAllocator()
  74. let recorder = ResponseRecorder()
  75. private func makeHandler(
  76. encoding: ServerMessageEncoding = .disabled,
  77. function: @escaping (String, StatusOnlyCallContext) -> EventLoopFuture<String>
  78. ) -> UnaryServerHandler<StringSerializer, StringDeserializer> {
  79. return UnaryServerHandler(
  80. context: self.makeCallHandlerContext(encoding: encoding),
  81. requestDeserializer: StringDeserializer(),
  82. responseSerializer: StringSerializer(),
  83. interceptors: [],
  84. userFunction: function
  85. )
  86. }
  87. private func echo(_ request: String, context: StatusOnlyCallContext) -> EventLoopFuture<String> {
  88. return context.eventLoop.makeSucceededFuture(request)
  89. }
  90. private func neverComplete(
  91. _ request: String,
  92. context: StatusOnlyCallContext
  93. ) -> EventLoopFuture<String> {
  94. let scheduled = context.eventLoop.scheduleTask(deadline: .distantFuture) {
  95. return request
  96. }
  97. return scheduled.futureResult
  98. }
  99. private func neverCalled(
  100. _ request: String,
  101. context: StatusOnlyCallContext
  102. ) -> EventLoopFuture<String> {
  103. XCTFail("Unexpected function invocation")
  104. return context.eventLoop.makeFailedFuture(GRPCError.InvalidState(""))
  105. }
  106. func testHappyPath() {
  107. let handler = self.makeHandler(function: self.echo(_:context:))
  108. handler.receiveMetadata([:])
  109. assertThat(self.recorder.metadata, .is([:]))
  110. let buffer = ByteBuffer(string: "hello")
  111. handler.receiveMessage(buffer)
  112. handler.receiveEnd()
  113. handler.finish()
  114. assertThat(self.recorder.messages.first, .is(buffer))
  115. assertThat(self.recorder.messageMetadata.first?.compress, .is(false))
  116. assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
  117. assertThat(self.recorder.trailers, .is([:]))
  118. }
  119. func testHappyPathWithCompressionEnabled() {
  120. let handler = self.makeHandler(
  121. encoding: .enabled(.init(decompressionLimit: .absolute(.max))),
  122. function: self.echo(_:context:)
  123. )
  124. handler.receiveMetadata([:])
  125. let buffer = ByteBuffer(string: "hello")
  126. handler.receiveMessage(buffer)
  127. assertThat(self.recorder.messages.first, .is(buffer))
  128. assertThat(self.recorder.messageMetadata.first?.compress, .is(true))
  129. }
  130. func testHappyPathWithCompressionEnabledButDisabledByCaller() {
  131. let handler = self.makeHandler(
  132. encoding: .enabled(.init(decompressionLimit: .absolute(.max)))
  133. ) { request, context in
  134. context.compressionEnabled = false
  135. return self.echo(request, context: context)
  136. }
  137. handler.receiveMetadata([:])
  138. let buffer = ByteBuffer(string: "hello")
  139. handler.receiveMessage(buffer)
  140. assertThat(self.recorder.messages.first, .is(buffer))
  141. assertThat(self.recorder.messageMetadata.first?.compress, .is(false))
  142. }
  143. func testThrowingDeserializer() {
  144. let handler = UnaryServerHandler(
  145. context: self.makeCallHandlerContext(),
  146. requestDeserializer: ThrowingStringDeserializer(),
  147. responseSerializer: StringSerializer(),
  148. interceptors: [],
  149. userFunction: self.neverCalled(_:context:)
  150. )
  151. handler.receiveMetadata([:])
  152. assertThat(self.recorder.metadata, .is([:]))
  153. let buffer = ByteBuffer(string: "hello")
  154. handler.receiveMessage(buffer)
  155. assertThat(self.recorder.messages, .isEmpty())
  156. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  157. }
  158. func testThrowingSerializer() {
  159. let handler = UnaryServerHandler(
  160. context: self.makeCallHandlerContext(),
  161. requestDeserializer: StringDeserializer(),
  162. responseSerializer: ThrowingStringSerializer(),
  163. interceptors: [],
  164. userFunction: self.echo(_:context:)
  165. )
  166. handler.receiveMetadata([:])
  167. assertThat(self.recorder.metadata, .is([:]))
  168. let buffer = ByteBuffer(string: "hello")
  169. handler.receiveMessage(buffer)
  170. handler.receiveEnd()
  171. assertThat(self.recorder.messages, .isEmpty())
  172. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  173. }
  174. func testUserFunctionReturnsFailedFuture() {
  175. let handler = self.makeHandler { _, context in
  176. return context.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: ":("))
  177. }
  178. handler.receiveMetadata([:])
  179. assertThat(self.recorder.metadata, .is([:]))
  180. let buffer = ByteBuffer(string: "hello")
  181. handler.receiveMessage(buffer)
  182. assertThat(self.recorder.messages, .isEmpty())
  183. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  184. assertThat(self.recorder.status?.message, .is(":("))
  185. }
  186. func testReceiveMessageBeforeHeaders() {
  187. let handler = self.makeHandler(function: self.neverCalled(_:context:))
  188. handler.receiveMessage(ByteBuffer(string: "foo"))
  189. assertThat(self.recorder.metadata, .is(.nil()))
  190. assertThat(self.recorder.messages, .isEmpty())
  191. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  192. }
  193. func testReceiveMultipleHeaders() {
  194. let handler = self.makeHandler(function: self.neverCalled(_:context:))
  195. handler.receiveMetadata([:])
  196. assertThat(self.recorder.metadata, .is([:]))
  197. handler.receiveMetadata([:])
  198. assertThat(self.recorder.messages, .isEmpty())
  199. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  200. }
  201. func testReceiveMultipleMessages() {
  202. let handler = self.makeHandler(function: self.neverComplete(_:context:))
  203. handler.receiveMetadata([:])
  204. assertThat(self.recorder.metadata, .is([:]))
  205. let buffer = ByteBuffer(string: "hello")
  206. handler.receiveMessage(buffer)
  207. handler.receiveEnd()
  208. // Send another message before the function completes.
  209. handler.receiveMessage(buffer)
  210. assertThat(self.recorder.messages, .isEmpty())
  211. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  212. }
  213. func testFinishBeforeStarting() {
  214. let handler = self.makeHandler(function: self.neverCalled(_:context:))
  215. handler.finish()
  216. assertThat(self.recorder.metadata, .is(.nil()))
  217. assertThat(self.recorder.messages, .isEmpty())
  218. assertThat(self.recorder.status, .is(.nil()))
  219. assertThat(self.recorder.trailers, .is(.nil()))
  220. }
  221. func testFinishAfterHeaders() {
  222. let handler = self.makeHandler(function: self.neverCalled(_:context:))
  223. handler.receiveMetadata([:])
  224. assertThat(self.recorder.metadata, .is([:]))
  225. handler.finish()
  226. assertThat(self.recorder.messages, .isEmpty())
  227. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  228. assertThat(self.recorder.trailers, .is([:]))
  229. }
  230. func testFinishAfterMessage() {
  231. let handler = self.makeHandler(function: self.neverComplete(_:context:))
  232. handler.receiveMetadata([:])
  233. handler.receiveMessage(ByteBuffer(string: "hello"))
  234. handler.finish()
  235. assertThat(self.recorder.messages, .isEmpty())
  236. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  237. assertThat(self.recorder.trailers, .is([:]))
  238. }
  239. }
  240. // MARK: - Client Streaming
  241. class ClientStreamingServerHandlerTests: GRPCTestCase, ServerHandlerTestCase {
  242. let eventLoop = EmbeddedEventLoop()
  243. let allocator = ByteBufferAllocator()
  244. let recorder = ResponseRecorder()
  245. private func makeHandler(
  246. encoding: ServerMessageEncoding = .disabled,
  247. observerFactory: @escaping (UnaryResponseCallContext<String>)
  248. -> EventLoopFuture<(StreamEvent<String>) -> Void>
  249. ) -> ClientStreamingServerHandler<StringSerializer, StringDeserializer> {
  250. return ClientStreamingServerHandler(
  251. context: self.makeCallHandlerContext(encoding: encoding),
  252. requestDeserializer: StringDeserializer(),
  253. responseSerializer: StringSerializer(),
  254. interceptors: [],
  255. observerFactory: observerFactory
  256. )
  257. }
  258. private func joinWithSpaces(
  259. context: UnaryResponseCallContext<String>
  260. ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
  261. var messages: [String] = []
  262. func onEvent(_ event: StreamEvent<String>) {
  263. switch event {
  264. case let .message(message):
  265. messages.append(message)
  266. case .end:
  267. context.responsePromise.succeed(messages.joined(separator: " "))
  268. }
  269. }
  270. return context.eventLoop.makeSucceededFuture(onEvent(_:))
  271. }
  272. private func neverReceivesMessage(
  273. context: UnaryResponseCallContext<String>
  274. ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
  275. func onEvent(_ event: StreamEvent<String>) {
  276. switch event {
  277. case let .message(message):
  278. XCTFail("Unexpected message: '\(message)'")
  279. case .end:
  280. context.responsePromise.succeed("")
  281. }
  282. }
  283. return context.eventLoop.makeSucceededFuture(onEvent(_:))
  284. }
  285. private func neverCalled(
  286. context: UnaryResponseCallContext<String>
  287. ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
  288. XCTFail("This observer factory should never be called")
  289. return context.eventLoop.makeFailedFuture(GRPCStatus(code: .aborted, message: nil))
  290. }
  291. func testHappyPath() {
  292. let handler = self.makeHandler(observerFactory: self.joinWithSpaces(context:))
  293. handler.receiveMetadata([:])
  294. assertThat(self.recorder.metadata, .is([:]))
  295. handler.receiveMessage(ByteBuffer(string: "1"))
  296. handler.receiveMessage(ByteBuffer(string: "2"))
  297. handler.receiveMessage(ByteBuffer(string: "3"))
  298. handler.receiveEnd()
  299. handler.finish()
  300. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "1 2 3")))
  301. assertThat(self.recorder.messageMetadata.first?.compress, .is(false))
  302. assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
  303. assertThat(self.recorder.trailers, .is([:]))
  304. }
  305. func testHappyPathWithCompressionEnabled() {
  306. let handler = self.makeHandler(
  307. encoding: .enabled(.init(decompressionLimit: .absolute(.max))),
  308. observerFactory: self.joinWithSpaces(context:)
  309. )
  310. handler.receiveMetadata([:])
  311. handler.receiveMessage(ByteBuffer(string: "1"))
  312. handler.receiveMessage(ByteBuffer(string: "2"))
  313. handler.receiveMessage(ByteBuffer(string: "3"))
  314. handler.receiveEnd()
  315. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "1 2 3")))
  316. assertThat(self.recorder.messageMetadata.first?.compress, .is(true))
  317. }
  318. func testHappyPathWithCompressionEnabledButDisabledByCaller() {
  319. let handler = self.makeHandler(
  320. encoding: .enabled(.init(decompressionLimit: .absolute(.max)))
  321. ) { context in
  322. context.compressionEnabled = false
  323. return self.joinWithSpaces(context: context)
  324. }
  325. handler.receiveMetadata([:])
  326. handler.receiveMessage(ByteBuffer(string: "1"))
  327. handler.receiveMessage(ByteBuffer(string: "2"))
  328. handler.receiveMessage(ByteBuffer(string: "3"))
  329. handler.receiveEnd()
  330. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "1 2 3")))
  331. assertThat(self.recorder.messageMetadata.first?.compress, .is(false))
  332. }
  333. func testThrowingDeserializer() {
  334. let handler = ClientStreamingServerHandler(
  335. context: self.makeCallHandlerContext(),
  336. requestDeserializer: ThrowingStringDeserializer(),
  337. responseSerializer: StringSerializer(),
  338. interceptors: [],
  339. observerFactory: self.neverReceivesMessage(context:)
  340. )
  341. handler.receiveMetadata([:])
  342. assertThat(self.recorder.metadata, .is([:]))
  343. let buffer = ByteBuffer(string: "hello")
  344. handler.receiveMessage(buffer)
  345. assertThat(self.recorder.messages, .isEmpty())
  346. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  347. }
  348. func testThrowingSerializer() {
  349. let handler = ClientStreamingServerHandler(
  350. context: self.makeCallHandlerContext(),
  351. requestDeserializer: StringDeserializer(),
  352. responseSerializer: ThrowingStringSerializer(),
  353. interceptors: [],
  354. observerFactory: self.joinWithSpaces(context:)
  355. )
  356. handler.receiveMetadata([:])
  357. assertThat(self.recorder.metadata, .is([:]))
  358. let buffer = ByteBuffer(string: "hello")
  359. handler.receiveMessage(buffer)
  360. handler.receiveEnd()
  361. assertThat(self.recorder.messages, .isEmpty())
  362. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  363. }
  364. func testObserverFactoryReturnsFailedFuture() {
  365. let handler = self.makeHandler { context in
  366. context.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: ":("))
  367. }
  368. handler.receiveMetadata([:])
  369. assertThat(self.recorder.messages, .isEmpty())
  370. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  371. assertThat(self.recorder.status?.message, .is(":("))
  372. }
  373. func testDelayedObserverFactory() {
  374. let promise = self.eventLoop.makePromise(of: Void.self)
  375. let handler = self.makeHandler { context in
  376. return promise.futureResult.flatMap {
  377. self.joinWithSpaces(context: context)
  378. }
  379. }
  380. handler.receiveMetadata([:])
  381. // Queue up some messages.
  382. handler.receiveMessage(ByteBuffer(string: "1"))
  383. handler.receiveMessage(ByteBuffer(string: "2"))
  384. handler.receiveMessage(ByteBuffer(string: "3"))
  385. // Succeed the observer block.
  386. promise.succeed(())
  387. // A few more messages.
  388. handler.receiveMessage(ByteBuffer(string: "4"))
  389. handler.receiveMessage(ByteBuffer(string: "5"))
  390. handler.receiveEnd()
  391. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "1 2 3 4 5")))
  392. assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
  393. }
  394. func testDelayedObserverFactoryAllMessagesBeforeSucceeding() {
  395. let promise = self.eventLoop.makePromise(of: Void.self)
  396. let handler = self.makeHandler { context in
  397. return promise.futureResult.flatMap {
  398. self.joinWithSpaces(context: context)
  399. }
  400. }
  401. handler.receiveMetadata([:])
  402. // Queue up some messages.
  403. handler.receiveMessage(ByteBuffer(string: "1"))
  404. handler.receiveMessage(ByteBuffer(string: "2"))
  405. handler.receiveMessage(ByteBuffer(string: "3"))
  406. handler.receiveEnd()
  407. // Succeed the observer block.
  408. promise.succeed(())
  409. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "1 2 3")))
  410. assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
  411. }
  412. func testReceiveMessageBeforeHeaders() {
  413. let handler = self.makeHandler(observerFactory: self.neverCalled(context:))
  414. handler.receiveMessage(ByteBuffer(string: "foo"))
  415. assertThat(self.recorder.metadata, .is(.nil()))
  416. assertThat(self.recorder.messages, .isEmpty())
  417. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  418. }
  419. func testReceiveMultipleHeaders() {
  420. let handler = self.makeHandler(observerFactory: self.neverReceivesMessage(context:))
  421. handler.receiveMetadata([:])
  422. assertThat(self.recorder.metadata, .is([:]))
  423. handler.receiveMetadata([:])
  424. assertThat(self.recorder.messages, .isEmpty())
  425. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  426. }
  427. func testFinishBeforeStarting() {
  428. let handler = self.makeHandler(observerFactory: self.neverCalled(context:))
  429. handler.finish()
  430. assertThat(self.recorder.metadata, .is(.nil()))
  431. assertThat(self.recorder.messages, .isEmpty())
  432. assertThat(self.recorder.status, .is(.nil()))
  433. assertThat(self.recorder.trailers, .is(.nil()))
  434. }
  435. func testFinishAfterHeaders() {
  436. let handler = self.makeHandler(observerFactory: self.joinWithSpaces(context:))
  437. handler.receiveMetadata([:])
  438. assertThat(self.recorder.metadata, .is([:]))
  439. handler.finish()
  440. assertThat(self.recorder.messages, .isEmpty())
  441. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  442. assertThat(self.recorder.trailers, .is([:]))
  443. }
  444. func testFinishAfterMessage() {
  445. let handler = self.makeHandler(observerFactory: self.joinWithSpaces(context:))
  446. handler.receiveMetadata([:])
  447. handler.receiveMessage(ByteBuffer(string: "hello"))
  448. handler.finish()
  449. assertThat(self.recorder.messages, .isEmpty())
  450. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  451. assertThat(self.recorder.trailers, .is([:]))
  452. }
  453. }
  454. class ServerStreamingServerHandlerTests: GRPCTestCase, ServerHandlerTestCase {
  455. let eventLoop = EmbeddedEventLoop()
  456. let allocator = ByteBufferAllocator()
  457. let recorder = ResponseRecorder()
  458. private func makeHandler(
  459. encoding: ServerMessageEncoding = .disabled,
  460. userFunction: @escaping (String, StreamingResponseCallContext<String>)
  461. -> EventLoopFuture<GRPCStatus>
  462. ) -> ServerStreamingServerHandler<StringSerializer, StringDeserializer> {
  463. return ServerStreamingServerHandler(
  464. context: self.makeCallHandlerContext(encoding: encoding),
  465. requestDeserializer: StringDeserializer(),
  466. responseSerializer: StringSerializer(),
  467. interceptors: [],
  468. userFunction: userFunction
  469. )
  470. }
  471. private func breakOnSpaces(
  472. _ request: String,
  473. context: StreamingResponseCallContext<String>
  474. ) -> EventLoopFuture<GRPCStatus> {
  475. let parts = request.components(separatedBy: " ")
  476. context.sendResponses(parts, promise: nil)
  477. return context.eventLoop.makeSucceededFuture(.ok)
  478. }
  479. private func neverCalled(
  480. _ request: String,
  481. context: StreamingResponseCallContext<String>
  482. ) -> EventLoopFuture<GRPCStatus> {
  483. XCTFail("Unexpected invocation")
  484. return context.eventLoop.makeSucceededFuture(.processingError)
  485. }
  486. private func neverComplete(
  487. _ request: String,
  488. context: StreamingResponseCallContext<String>
  489. ) -> EventLoopFuture<GRPCStatus> {
  490. return context.eventLoop.scheduleTask(deadline: .distantFuture) {
  491. return .processingError
  492. }.futureResult
  493. }
  494. func testHappyPath() {
  495. let handler = self.makeHandler(userFunction: self.breakOnSpaces(_:context:))
  496. handler.receiveMetadata([:])
  497. assertThat(self.recorder.metadata, .is([:]))
  498. handler.receiveMessage(ByteBuffer(string: "a b"))
  499. handler.receiveEnd()
  500. handler.finish()
  501. assertThat(
  502. self.recorder.messages,
  503. .is([ByteBuffer(string: "a"), ByteBuffer(string: "b")])
  504. )
  505. assertThat(self.recorder.messageMetadata.map { $0.compress }, .is([false, false]))
  506. assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
  507. assertThat(self.recorder.trailers, .is([:]))
  508. }
  509. func testHappyPathWithCompressionEnabled() {
  510. let handler = self.makeHandler(
  511. encoding: .enabled(.init(decompressionLimit: .absolute(.max))),
  512. userFunction: self.breakOnSpaces(_:context:)
  513. )
  514. handler.receiveMetadata([:])
  515. handler.receiveMessage(ByteBuffer(string: "a"))
  516. handler.receiveEnd()
  517. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "a")))
  518. assertThat(self.recorder.messageMetadata.first?.compress, .is(true))
  519. }
  520. func testHappyPathWithCompressionEnabledButDisabledByCaller() {
  521. let handler = self.makeHandler(
  522. encoding: .enabled(.init(decompressionLimit: .absolute(.max)))
  523. ) { request, context in
  524. context.compressionEnabled = false
  525. return self.breakOnSpaces(request, context: context)
  526. }
  527. handler.receiveMetadata([:])
  528. handler.receiveMessage(ByteBuffer(string: "a"))
  529. handler.receiveEnd()
  530. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "a")))
  531. assertThat(self.recorder.messageMetadata.first?.compress, .is(false))
  532. }
  533. func testThrowingDeserializer() {
  534. let handler = ServerStreamingServerHandler(
  535. context: self.makeCallHandlerContext(),
  536. requestDeserializer: ThrowingStringDeserializer(),
  537. responseSerializer: StringSerializer(),
  538. interceptors: [],
  539. userFunction: self.neverCalled(_:context:)
  540. )
  541. handler.receiveMetadata([:])
  542. assertThat(self.recorder.metadata, .is([:]))
  543. let buffer = ByteBuffer(string: "hello")
  544. handler.receiveMessage(buffer)
  545. assertThat(self.recorder.messages, .isEmpty())
  546. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  547. }
  548. func testThrowingSerializer() {
  549. let handler = ServerStreamingServerHandler(
  550. context: self.makeCallHandlerContext(),
  551. requestDeserializer: StringDeserializer(),
  552. responseSerializer: ThrowingStringSerializer(),
  553. interceptors: [],
  554. userFunction: self.breakOnSpaces(_:context:)
  555. )
  556. handler.receiveMetadata([:])
  557. assertThat(self.recorder.metadata, .is([:]))
  558. let buffer = ByteBuffer(string: "1 2 3")
  559. handler.receiveMessage(buffer)
  560. handler.receiveEnd()
  561. assertThat(self.recorder.messages, .isEmpty())
  562. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  563. }
  564. func testUserFunctionReturnsFailedFuture() {
  565. let handler = self.makeHandler { _, context in
  566. return context.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: ":("))
  567. }
  568. handler.receiveMetadata([:])
  569. assertThat(self.recorder.metadata, .is([:]))
  570. let buffer = ByteBuffer(string: "hello")
  571. handler.receiveMessage(buffer)
  572. assertThat(self.recorder.messages, .isEmpty())
  573. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  574. assertThat(self.recorder.status?.message, .is(":("))
  575. }
  576. func testReceiveMessageBeforeHeaders() {
  577. let handler = self.makeHandler(userFunction: self.neverCalled(_:context:))
  578. handler.receiveMessage(ByteBuffer(string: "foo"))
  579. assertThat(self.recorder.metadata, .is(.nil()))
  580. assertThat(self.recorder.messages, .isEmpty())
  581. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  582. }
  583. func testReceiveMultipleHeaders() {
  584. let handler = self.makeHandler(userFunction: self.neverCalled(_:context:))
  585. handler.receiveMetadata([:])
  586. assertThat(self.recorder.metadata, .is([:]))
  587. handler.receiveMetadata([:])
  588. assertThat(self.recorder.messages, .isEmpty())
  589. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  590. }
  591. func testReceiveMultipleMessages() {
  592. let handler = self.makeHandler(userFunction: self.neverComplete(_:context:))
  593. handler.receiveMetadata([:])
  594. assertThat(self.recorder.metadata, .is([:]))
  595. let buffer = ByteBuffer(string: "hello")
  596. handler.receiveMessage(buffer)
  597. handler.receiveEnd()
  598. // Send another message before the function completes.
  599. handler.receiveMessage(buffer)
  600. assertThat(self.recorder.messages, .isEmpty())
  601. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  602. }
  603. func testFinishBeforeStarting() {
  604. let handler = self.makeHandler(userFunction: self.neverCalled(_:context:))
  605. handler.finish()
  606. assertThat(self.recorder.metadata, .is(.nil()))
  607. assertThat(self.recorder.messages, .isEmpty())
  608. assertThat(self.recorder.status, .is(.nil()))
  609. assertThat(self.recorder.trailers, .is(.nil()))
  610. }
  611. func testFinishAfterHeaders() {
  612. let handler = self.makeHandler(userFunction: self.neverCalled(_:context:))
  613. handler.receiveMetadata([:])
  614. assertThat(self.recorder.metadata, .is([:]))
  615. handler.finish()
  616. assertThat(self.recorder.messages, .isEmpty())
  617. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  618. assertThat(self.recorder.trailers, .is([:]))
  619. }
  620. func testFinishAfterMessage() {
  621. let handler = self.makeHandler(userFunction: self.neverComplete(_:context:))
  622. handler.receiveMetadata([:])
  623. handler.receiveMessage(ByteBuffer(string: "hello"))
  624. handler.finish()
  625. assertThat(self.recorder.messages, .isEmpty())
  626. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  627. assertThat(self.recorder.trailers, .is([:]))
  628. }
  629. }
  630. // MARK: - Bidirectional Streaming
  631. class BidirectionalStreamingServerHandlerTests: GRPCTestCase, ServerHandlerTestCase {
  632. let eventLoop = EmbeddedEventLoop()
  633. let allocator = ByteBufferAllocator()
  634. let recorder = ResponseRecorder()
  635. private func makeHandler(
  636. encoding: ServerMessageEncoding = .disabled,
  637. observerFactory: @escaping (StreamingResponseCallContext<String>)
  638. -> EventLoopFuture<(StreamEvent<String>) -> Void>
  639. ) -> BidirectionalStreamingServerHandler<StringSerializer, StringDeserializer> {
  640. return BidirectionalStreamingServerHandler(
  641. context: self.makeCallHandlerContext(encoding: encoding),
  642. requestDeserializer: StringDeserializer(),
  643. responseSerializer: StringSerializer(),
  644. interceptors: [],
  645. observerFactory: observerFactory
  646. )
  647. }
  648. private func echo(
  649. context: StreamingResponseCallContext<String>
  650. ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
  651. func onEvent(_ event: StreamEvent<String>) {
  652. switch event {
  653. case let .message(message):
  654. context.sendResponse(message, promise: nil)
  655. case .end:
  656. context.statusPromise.succeed(.ok)
  657. }
  658. }
  659. return context.eventLoop.makeSucceededFuture(onEvent(_:))
  660. }
  661. private func neverReceivesMessage(
  662. context: StreamingResponseCallContext<String>
  663. ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
  664. func onEvent(_ event: StreamEvent<String>) {
  665. switch event {
  666. case let .message(message):
  667. XCTFail("Unexpected message: '\(message)'")
  668. case .end:
  669. context.statusPromise.succeed(.ok)
  670. }
  671. }
  672. return context.eventLoop.makeSucceededFuture(onEvent(_:))
  673. }
  674. private func neverCalled(
  675. context: StreamingResponseCallContext<String>
  676. ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
  677. XCTFail("This observer factory should never be called")
  678. return context.eventLoop.makeFailedFuture(GRPCStatus(code: .aborted, message: nil))
  679. }
  680. func testHappyPath() {
  681. let handler = self.makeHandler(observerFactory: self.echo(context:))
  682. handler.receiveMetadata([:])
  683. assertThat(self.recorder.metadata, .is([:]))
  684. handler.receiveMessage(ByteBuffer(string: "1"))
  685. handler.receiveMessage(ByteBuffer(string: "2"))
  686. handler.receiveMessage(ByteBuffer(string: "3"))
  687. handler.receiveEnd()
  688. handler.finish()
  689. assertThat(
  690. self.recorder.messages,
  691. .is([ByteBuffer(string: "1"), ByteBuffer(string: "2"), ByteBuffer(string: "3")])
  692. )
  693. assertThat(self.recorder.messageMetadata.map { $0.compress }, .is([false, false, false]))
  694. assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
  695. assertThat(self.recorder.trailers, .is([:]))
  696. }
  697. func testHappyPathWithCompressionEnabled() {
  698. let handler = self.makeHandler(
  699. encoding: .enabled(.init(decompressionLimit: .absolute(.max))),
  700. observerFactory: self.echo(context:)
  701. )
  702. handler.receiveMetadata([:])
  703. handler.receiveMessage(ByteBuffer(string: "1"))
  704. handler.receiveMessage(ByteBuffer(string: "2"))
  705. handler.receiveMessage(ByteBuffer(string: "3"))
  706. handler.receiveEnd()
  707. assertThat(
  708. self.recorder.messages,
  709. .is([ByteBuffer(string: "1"), ByteBuffer(string: "2"), ByteBuffer(string: "3")])
  710. )
  711. assertThat(self.recorder.messageMetadata.map { $0.compress }, .is([true, true, true]))
  712. }
  713. func testHappyPathWithCompressionEnabledButDisabledByCaller() {
  714. let handler = self.makeHandler(
  715. encoding: .enabled(.init(decompressionLimit: .absolute(.max)))
  716. ) { context in
  717. context.compressionEnabled = false
  718. return self.echo(context: context)
  719. }
  720. handler.receiveMetadata([:])
  721. handler.receiveMessage(ByteBuffer(string: "1"))
  722. handler.receiveMessage(ByteBuffer(string: "2"))
  723. handler.receiveMessage(ByteBuffer(string: "3"))
  724. handler.receiveEnd()
  725. assertThat(
  726. self.recorder.messages,
  727. .is([ByteBuffer(string: "1"), ByteBuffer(string: "2"), ByteBuffer(string: "3")])
  728. )
  729. assertThat(self.recorder.messageMetadata.map { $0.compress }, .is([false, false, false]))
  730. }
  731. func testThrowingDeserializer() {
  732. let handler = BidirectionalStreamingServerHandler(
  733. context: self.makeCallHandlerContext(),
  734. requestDeserializer: ThrowingStringDeserializer(),
  735. responseSerializer: StringSerializer(),
  736. interceptors: [],
  737. observerFactory: self.neverReceivesMessage(context:)
  738. )
  739. handler.receiveMetadata([:])
  740. assertThat(self.recorder.metadata, .is([:]))
  741. let buffer = ByteBuffer(string: "hello")
  742. handler.receiveMessage(buffer)
  743. assertThat(self.recorder.messages, .isEmpty())
  744. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  745. }
  746. func testThrowingSerializer() {
  747. let handler = BidirectionalStreamingServerHandler(
  748. context: self.makeCallHandlerContext(),
  749. requestDeserializer: StringDeserializer(),
  750. responseSerializer: ThrowingStringSerializer(),
  751. interceptors: [],
  752. observerFactory: self.echo(context:)
  753. )
  754. handler.receiveMetadata([:])
  755. assertThat(self.recorder.metadata, .is([:]))
  756. let buffer = ByteBuffer(string: "hello")
  757. handler.receiveMessage(buffer)
  758. handler.receiveEnd()
  759. assertThat(self.recorder.messages, .isEmpty())
  760. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  761. }
  762. func testObserverFactoryReturnsFailedFuture() {
  763. let handler = self.makeHandler { context in
  764. context.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: ":("))
  765. }
  766. handler.receiveMetadata([:])
  767. assertThat(self.recorder.messages, .isEmpty())
  768. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  769. assertThat(self.recorder.status?.message, .is(":("))
  770. }
  771. func testDelayedObserverFactory() {
  772. let promise = self.eventLoop.makePromise(of: Void.self)
  773. let handler = self.makeHandler { context in
  774. return promise.futureResult.flatMap {
  775. self.echo(context: context)
  776. }
  777. }
  778. handler.receiveMetadata([:])
  779. // Queue up some messages.
  780. handler.receiveMessage(ByteBuffer(string: "1"))
  781. // Succeed the observer block.
  782. promise.succeed(())
  783. // A few more messages.
  784. handler.receiveMessage(ByteBuffer(string: "2"))
  785. handler.receiveEnd()
  786. assertThat(
  787. self.recorder.messages,
  788. .is([ByteBuffer(string: "1"), ByteBuffer(string: "2")])
  789. )
  790. assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
  791. }
  792. func testDelayedObserverFactoryAllMessagesBeforeSucceeding() {
  793. let promise = self.eventLoop.makePromise(of: Void.self)
  794. let handler = self.makeHandler { context in
  795. return promise.futureResult.flatMap {
  796. self.echo(context: context)
  797. }
  798. }
  799. handler.receiveMetadata([:])
  800. // Queue up some messages.
  801. handler.receiveMessage(ByteBuffer(string: "1"))
  802. handler.receiveMessage(ByteBuffer(string: "2"))
  803. handler.receiveEnd()
  804. // Succeed the observer block.
  805. promise.succeed(())
  806. assertThat(
  807. self.recorder.messages,
  808. .is([ByteBuffer(string: "1"), ByteBuffer(string: "2")])
  809. )
  810. assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
  811. }
  812. func testReceiveMessageBeforeHeaders() {
  813. let handler = self.makeHandler(observerFactory: self.neverCalled(context:))
  814. handler.receiveMessage(ByteBuffer(string: "foo"))
  815. assertThat(self.recorder.metadata, .is(.nil()))
  816. assertThat(self.recorder.messages, .isEmpty())
  817. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  818. }
  819. func testReceiveMultipleHeaders() {
  820. let handler = self.makeHandler(observerFactory: self.neverReceivesMessage(context:))
  821. handler.receiveMetadata([:])
  822. assertThat(self.recorder.metadata, .is([:]))
  823. handler.receiveMetadata([:])
  824. assertThat(self.recorder.messages, .isEmpty())
  825. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  826. }
  827. func testFinishBeforeStarting() {
  828. let handler = self.makeHandler(observerFactory: self.neverCalled(context:))
  829. handler.finish()
  830. assertThat(self.recorder.metadata, .is(.nil()))
  831. assertThat(self.recorder.messages, .isEmpty())
  832. assertThat(self.recorder.status, .is(.nil()))
  833. assertThat(self.recorder.trailers, .is(.nil()))
  834. }
  835. func testFinishAfterHeaders() {
  836. let handler = self.makeHandler(observerFactory: self.echo(context:))
  837. handler.receiveMetadata([:])
  838. assertThat(self.recorder.metadata, .is([:]))
  839. handler.finish()
  840. assertThat(self.recorder.messages, .isEmpty())
  841. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  842. assertThat(self.recorder.trailers, .is([:]))
  843. }
  844. func testFinishAfterMessage() {
  845. let handler = self.makeHandler(observerFactory: self.echo(context:))
  846. handler.receiveMetadata([:])
  847. handler.receiveMessage(ByteBuffer(string: "hello"))
  848. handler.finish()
  849. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "hello")))
  850. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  851. assertThat(self.recorder.trailers, .is([:]))
  852. }
  853. }