UnaryServerHandlerTests.swift 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. @testable import GRPC
  17. import NIO
  18. import NIOHPACK
  19. import XCTest
  20. // MARK: - Utils
  21. final class ResponseRecorder: GRPCServerResponseWriter {
  22. var metadata: HPACKHeaders?
  23. var messages: [ByteBuffer] = []
  24. var messageMetadata: [MessageMetadata] = []
  25. var status: GRPCStatus?
  26. var trailers: HPACKHeaders?
  27. func sendMetadata(_ metadata: HPACKHeaders, flush: Bool, promise: EventLoopPromise<Void>?) {
  28. XCTAssertNil(self.metadata)
  29. self.metadata = metadata
  30. promise?.succeed(())
  31. }
  32. func sendMessage(
  33. _ bytes: ByteBuffer,
  34. metadata: MessageMetadata,
  35. promise: EventLoopPromise<Void>?
  36. ) {
  37. self.messages.append(bytes)
  38. self.messageMetadata.append(metadata)
  39. promise?.succeed(())
  40. }
  41. func sendEnd(status: GRPCStatus, trailers: HPACKHeaders, promise: EventLoopPromise<Void>?) {
  42. XCTAssertNil(self.status)
  43. XCTAssertNil(self.trailers)
  44. self.status = status
  45. self.trailers = trailers
  46. promise?.succeed(())
  47. }
  48. }
  49. protocol ServerHandlerTestCase: GRPCTestCase {
  50. var eventLoop: EmbeddedEventLoop { get }
  51. var allocator: ByteBufferAllocator { get }
  52. var recorder: ResponseRecorder { get }
  53. }
  54. extension ServerHandlerTestCase {
  55. func makeCallHandlerContext(encoding: ServerMessageEncoding = .disabled) -> CallHandlerContext {
  56. return CallHandlerContext(
  57. errorDelegate: nil,
  58. logger: self.logger,
  59. encoding: encoding,
  60. eventLoop: self.eventLoop,
  61. path: "/ignored",
  62. remoteAddress: nil,
  63. responseWriter: self.recorder,
  64. allocator: self.allocator,
  65. closeFuture: self.eventLoop.makeSucceededVoidFuture()
  66. )
  67. }
  68. }
  69. // MARK: - Unary
  70. class UnaryServerHandlerTests: GRPCTestCase, ServerHandlerTestCase {
  71. let eventLoop = EmbeddedEventLoop()
  72. let allocator = ByteBufferAllocator()
  73. let recorder = ResponseRecorder()
  74. private func makeHandler(
  75. encoding: ServerMessageEncoding = .disabled,
  76. function: @escaping (String, StatusOnlyCallContext) -> EventLoopFuture<String>
  77. ) -> UnaryServerHandler<StringSerializer, StringDeserializer> {
  78. return UnaryServerHandler(
  79. context: self.makeCallHandlerContext(encoding: encoding),
  80. requestDeserializer: StringDeserializer(),
  81. responseSerializer: StringSerializer(),
  82. interceptors: [],
  83. userFunction: function
  84. )
  85. }
  86. private func echo(_ request: String, context: StatusOnlyCallContext) -> EventLoopFuture<String> {
  87. return context.eventLoop.makeSucceededFuture(request)
  88. }
  89. private func neverComplete(
  90. _ request: String,
  91. context: StatusOnlyCallContext
  92. ) -> EventLoopFuture<String> {
  93. let scheduled = context.eventLoop.scheduleTask(deadline: .distantFuture) {
  94. return request
  95. }
  96. return scheduled.futureResult
  97. }
  98. private func neverCalled(
  99. _ request: String,
  100. context: StatusOnlyCallContext
  101. ) -> EventLoopFuture<String> {
  102. XCTFail("Unexpected function invocation")
  103. return context.eventLoop.makeFailedFuture(GRPCError.InvalidState(""))
  104. }
  105. func testHappyPath() {
  106. let handler = self.makeHandler(function: self.echo(_:context:))
  107. handler.receiveMetadata([:])
  108. assertThat(self.recorder.metadata, .is([:]))
  109. let buffer = ByteBuffer(string: "hello")
  110. handler.receiveMessage(buffer)
  111. handler.receiveEnd()
  112. handler.finish()
  113. assertThat(self.recorder.messages.first, .is(buffer))
  114. assertThat(self.recorder.messageMetadata.first?.compress, .is(false))
  115. assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
  116. assertThat(self.recorder.trailers, .is([:]))
  117. }
  118. func testHappyPathWithCompressionEnabled() {
  119. let handler = self.makeHandler(
  120. encoding: .enabled(.init(decompressionLimit: .absolute(.max))),
  121. function: self.echo(_:context:)
  122. )
  123. handler.receiveMetadata([:])
  124. let buffer = ByteBuffer(string: "hello")
  125. handler.receiveMessage(buffer)
  126. assertThat(self.recorder.messages.first, .is(buffer))
  127. assertThat(self.recorder.messageMetadata.first?.compress, .is(true))
  128. }
  129. func testHappyPathWithCompressionEnabledButDisabledByCaller() {
  130. let handler = self.makeHandler(
  131. encoding: .enabled(.init(decompressionLimit: .absolute(.max)))
  132. ) { request, context in
  133. context.compressionEnabled = false
  134. return self.echo(request, context: context)
  135. }
  136. handler.receiveMetadata([:])
  137. let buffer = ByteBuffer(string: "hello")
  138. handler.receiveMessage(buffer)
  139. assertThat(self.recorder.messages.first, .is(buffer))
  140. assertThat(self.recorder.messageMetadata.first?.compress, .is(false))
  141. }
  142. func testThrowingDeserializer() {
  143. let handler = UnaryServerHandler(
  144. context: self.makeCallHandlerContext(),
  145. requestDeserializer: ThrowingStringDeserializer(),
  146. responseSerializer: StringSerializer(),
  147. interceptors: [],
  148. userFunction: self.neverCalled(_:context:)
  149. )
  150. handler.receiveMetadata([:])
  151. assertThat(self.recorder.metadata, .is([:]))
  152. let buffer = ByteBuffer(string: "hello")
  153. handler.receiveMessage(buffer)
  154. assertThat(self.recorder.messages, .isEmpty())
  155. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  156. }
  157. func testThrowingSerializer() {
  158. let handler = UnaryServerHandler(
  159. context: self.makeCallHandlerContext(),
  160. requestDeserializer: StringDeserializer(),
  161. responseSerializer: ThrowingStringSerializer(),
  162. interceptors: [],
  163. userFunction: self.echo(_:context:)
  164. )
  165. handler.receiveMetadata([:])
  166. assertThat(self.recorder.metadata, .is([:]))
  167. let buffer = ByteBuffer(string: "hello")
  168. handler.receiveMessage(buffer)
  169. handler.receiveEnd()
  170. assertThat(self.recorder.messages, .isEmpty())
  171. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  172. }
  173. func testUserFunctionReturnsFailedFuture() {
  174. let handler = self.makeHandler { _, context in
  175. return context.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: ":("))
  176. }
  177. handler.receiveMetadata([:])
  178. assertThat(self.recorder.metadata, .is([:]))
  179. let buffer = ByteBuffer(string: "hello")
  180. handler.receiveMessage(buffer)
  181. assertThat(self.recorder.messages, .isEmpty())
  182. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  183. assertThat(self.recorder.status?.message, .is(":("))
  184. }
  185. func testReceiveMessageBeforeHeaders() {
  186. let handler = self.makeHandler(function: self.neverCalled(_:context:))
  187. handler.receiveMessage(ByteBuffer(string: "foo"))
  188. assertThat(self.recorder.metadata, .is(.nil()))
  189. assertThat(self.recorder.messages, .isEmpty())
  190. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  191. }
  192. func testReceiveMultipleHeaders() {
  193. let handler = self.makeHandler(function: self.neverCalled(_:context:))
  194. handler.receiveMetadata([:])
  195. assertThat(self.recorder.metadata, .is([:]))
  196. handler.receiveMetadata([:])
  197. assertThat(self.recorder.messages, .isEmpty())
  198. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  199. }
  200. func testReceiveMultipleMessages() {
  201. let handler = self.makeHandler(function: self.neverComplete(_:context:))
  202. handler.receiveMetadata([:])
  203. assertThat(self.recorder.metadata, .is([:]))
  204. let buffer = ByteBuffer(string: "hello")
  205. handler.receiveMessage(buffer)
  206. handler.receiveEnd()
  207. // Send another message before the function completes.
  208. handler.receiveMessage(buffer)
  209. assertThat(self.recorder.messages, .isEmpty())
  210. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  211. }
  212. func testFinishBeforeStarting() {
  213. let handler = self.makeHandler(function: self.neverCalled(_:context:))
  214. handler.finish()
  215. assertThat(self.recorder.metadata, .is(.nil()))
  216. assertThat(self.recorder.messages, .isEmpty())
  217. assertThat(self.recorder.status, .is(.nil()))
  218. assertThat(self.recorder.trailers, .is(.nil()))
  219. }
  220. func testFinishAfterHeaders() {
  221. let handler = self.makeHandler(function: self.neverCalled(_:context:))
  222. handler.receiveMetadata([:])
  223. assertThat(self.recorder.metadata, .is([:]))
  224. handler.finish()
  225. assertThat(self.recorder.messages, .isEmpty())
  226. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  227. assertThat(self.recorder.trailers, .is([:]))
  228. }
  229. func testFinishAfterMessage() {
  230. let handler = self.makeHandler(function: self.neverComplete(_:context:))
  231. handler.receiveMetadata([:])
  232. handler.receiveMessage(ByteBuffer(string: "hello"))
  233. handler.finish()
  234. assertThat(self.recorder.messages, .isEmpty())
  235. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  236. assertThat(self.recorder.trailers, .is([:]))
  237. }
  238. }
  239. // MARK: - Client Streaming
  240. class ClientStreamingServerHandlerTests: GRPCTestCase, ServerHandlerTestCase {
  241. let eventLoop = EmbeddedEventLoop()
  242. let allocator = ByteBufferAllocator()
  243. let recorder = ResponseRecorder()
  244. private func makeHandler(
  245. encoding: ServerMessageEncoding = .disabled,
  246. observerFactory: @escaping (UnaryResponseCallContext<String>)
  247. -> EventLoopFuture<(StreamEvent<String>) -> Void>
  248. ) -> ClientStreamingServerHandler<StringSerializer, StringDeserializer> {
  249. return ClientStreamingServerHandler(
  250. context: self.makeCallHandlerContext(encoding: encoding),
  251. requestDeserializer: StringDeserializer(),
  252. responseSerializer: StringSerializer(),
  253. interceptors: [],
  254. observerFactory: observerFactory
  255. )
  256. }
  257. private func joinWithSpaces(
  258. context: UnaryResponseCallContext<String>
  259. ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
  260. var messages: [String] = []
  261. func onEvent(_ event: StreamEvent<String>) {
  262. switch event {
  263. case let .message(message):
  264. messages.append(message)
  265. case .end:
  266. context.responsePromise.succeed(messages.joined(separator: " "))
  267. }
  268. }
  269. return context.eventLoop.makeSucceededFuture(onEvent(_:))
  270. }
  271. private func neverReceivesMessage(
  272. context: UnaryResponseCallContext<String>
  273. ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
  274. func onEvent(_ event: StreamEvent<String>) {
  275. switch event {
  276. case let .message(message):
  277. XCTFail("Unexpected message: '\(message)'")
  278. case .end:
  279. context.responsePromise.succeed("")
  280. }
  281. }
  282. return context.eventLoop.makeSucceededFuture(onEvent(_:))
  283. }
  284. private func neverCalled(
  285. context: UnaryResponseCallContext<String>
  286. ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
  287. XCTFail("This observer factory should never be called")
  288. return context.eventLoop.makeFailedFuture(GRPCStatus(code: .aborted, message: nil))
  289. }
  290. func testHappyPath() {
  291. let handler = self.makeHandler(observerFactory: self.joinWithSpaces(context:))
  292. handler.receiveMetadata([:])
  293. assertThat(self.recorder.metadata, .is([:]))
  294. handler.receiveMessage(ByteBuffer(string: "1"))
  295. handler.receiveMessage(ByteBuffer(string: "2"))
  296. handler.receiveMessage(ByteBuffer(string: "3"))
  297. handler.receiveEnd()
  298. handler.finish()
  299. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "1 2 3")))
  300. assertThat(self.recorder.messageMetadata.first?.compress, .is(false))
  301. assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
  302. assertThat(self.recorder.trailers, .is([:]))
  303. }
  304. func testHappyPathWithCompressionEnabled() {
  305. let handler = self.makeHandler(
  306. encoding: .enabled(.init(decompressionLimit: .absolute(.max))),
  307. observerFactory: self.joinWithSpaces(context:)
  308. )
  309. handler.receiveMetadata([:])
  310. handler.receiveMessage(ByteBuffer(string: "1"))
  311. handler.receiveMessage(ByteBuffer(string: "2"))
  312. handler.receiveMessage(ByteBuffer(string: "3"))
  313. handler.receiveEnd()
  314. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "1 2 3")))
  315. assertThat(self.recorder.messageMetadata.first?.compress, .is(true))
  316. }
  317. func testHappyPathWithCompressionEnabledButDisabledByCaller() {
  318. let handler = self.makeHandler(
  319. encoding: .enabled(.init(decompressionLimit: .absolute(.max)))
  320. ) { context in
  321. context.compressionEnabled = false
  322. return self.joinWithSpaces(context: context)
  323. }
  324. handler.receiveMetadata([:])
  325. handler.receiveMessage(ByteBuffer(string: "1"))
  326. handler.receiveMessage(ByteBuffer(string: "2"))
  327. handler.receiveMessage(ByteBuffer(string: "3"))
  328. handler.receiveEnd()
  329. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "1 2 3")))
  330. assertThat(self.recorder.messageMetadata.first?.compress, .is(false))
  331. }
  332. func testThrowingDeserializer() {
  333. let handler = ClientStreamingServerHandler(
  334. context: self.makeCallHandlerContext(),
  335. requestDeserializer: ThrowingStringDeserializer(),
  336. responseSerializer: StringSerializer(),
  337. interceptors: [],
  338. observerFactory: self.neverReceivesMessage(context:)
  339. )
  340. handler.receiveMetadata([:])
  341. assertThat(self.recorder.metadata, .is([:]))
  342. let buffer = ByteBuffer(string: "hello")
  343. handler.receiveMessage(buffer)
  344. assertThat(self.recorder.messages, .isEmpty())
  345. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  346. }
  347. func testThrowingSerializer() {
  348. let handler = ClientStreamingServerHandler(
  349. context: self.makeCallHandlerContext(),
  350. requestDeserializer: StringDeserializer(),
  351. responseSerializer: ThrowingStringSerializer(),
  352. interceptors: [],
  353. observerFactory: self.joinWithSpaces(context:)
  354. )
  355. handler.receiveMetadata([:])
  356. assertThat(self.recorder.metadata, .is([:]))
  357. let buffer = ByteBuffer(string: "hello")
  358. handler.receiveMessage(buffer)
  359. handler.receiveEnd()
  360. assertThat(self.recorder.messages, .isEmpty())
  361. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  362. }
  363. func testObserverFactoryReturnsFailedFuture() {
  364. let handler = self.makeHandler { context in
  365. context.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: ":("))
  366. }
  367. handler.receiveMetadata([:])
  368. assertThat(self.recorder.messages, .isEmpty())
  369. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  370. assertThat(self.recorder.status?.message, .is(":("))
  371. }
  372. func testDelayedObserverFactory() {
  373. let promise = self.eventLoop.makePromise(of: Void.self)
  374. let handler = self.makeHandler { context in
  375. return promise.futureResult.flatMap {
  376. self.joinWithSpaces(context: context)
  377. }
  378. }
  379. handler.receiveMetadata([:])
  380. // Queue up some messages.
  381. handler.receiveMessage(ByteBuffer(string: "1"))
  382. handler.receiveMessage(ByteBuffer(string: "2"))
  383. handler.receiveMessage(ByteBuffer(string: "3"))
  384. // Succeed the observer block.
  385. promise.succeed(())
  386. // A few more messages.
  387. handler.receiveMessage(ByteBuffer(string: "4"))
  388. handler.receiveMessage(ByteBuffer(string: "5"))
  389. handler.receiveEnd()
  390. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "1 2 3 4 5")))
  391. assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
  392. }
  393. func testDelayedObserverFactoryAllMessagesBeforeSucceeding() {
  394. let promise = self.eventLoop.makePromise(of: Void.self)
  395. let handler = self.makeHandler { context in
  396. return promise.futureResult.flatMap {
  397. self.joinWithSpaces(context: context)
  398. }
  399. }
  400. handler.receiveMetadata([:])
  401. // Queue up some messages.
  402. handler.receiveMessage(ByteBuffer(string: "1"))
  403. handler.receiveMessage(ByteBuffer(string: "2"))
  404. handler.receiveMessage(ByteBuffer(string: "3"))
  405. handler.receiveEnd()
  406. // Succeed the observer block.
  407. promise.succeed(())
  408. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "1 2 3")))
  409. assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
  410. }
  411. func testReceiveMessageBeforeHeaders() {
  412. let handler = self.makeHandler(observerFactory: self.neverCalled(context:))
  413. handler.receiveMessage(ByteBuffer(string: "foo"))
  414. assertThat(self.recorder.metadata, .is(.nil()))
  415. assertThat(self.recorder.messages, .isEmpty())
  416. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  417. }
  418. func testReceiveMultipleHeaders() {
  419. let handler = self.makeHandler(observerFactory: self.neverReceivesMessage(context:))
  420. handler.receiveMetadata([:])
  421. assertThat(self.recorder.metadata, .is([:]))
  422. handler.receiveMetadata([:])
  423. assertThat(self.recorder.messages, .isEmpty())
  424. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  425. }
  426. func testFinishBeforeStarting() {
  427. let handler = self.makeHandler(observerFactory: self.neverCalled(context:))
  428. handler.finish()
  429. assertThat(self.recorder.metadata, .is(.nil()))
  430. assertThat(self.recorder.messages, .isEmpty())
  431. assertThat(self.recorder.status, .is(.nil()))
  432. assertThat(self.recorder.trailers, .is(.nil()))
  433. }
  434. func testFinishAfterHeaders() {
  435. let handler = self.makeHandler(observerFactory: self.joinWithSpaces(context:))
  436. handler.receiveMetadata([:])
  437. assertThat(self.recorder.metadata, .is([:]))
  438. handler.finish()
  439. assertThat(self.recorder.messages, .isEmpty())
  440. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  441. assertThat(self.recorder.trailers, .is([:]))
  442. }
  443. func testFinishAfterMessage() {
  444. let handler = self.makeHandler(observerFactory: self.joinWithSpaces(context:))
  445. handler.receiveMetadata([:])
  446. handler.receiveMessage(ByteBuffer(string: "hello"))
  447. handler.finish()
  448. assertThat(self.recorder.messages, .isEmpty())
  449. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  450. assertThat(self.recorder.trailers, .is([:]))
  451. }
  452. }
  453. class ServerStreamingServerHandlerTests: GRPCTestCase, ServerHandlerTestCase {
  454. let eventLoop = EmbeddedEventLoop()
  455. let allocator = ByteBufferAllocator()
  456. let recorder = ResponseRecorder()
  457. private func makeHandler(
  458. encoding: ServerMessageEncoding = .disabled,
  459. userFunction: @escaping (String, StreamingResponseCallContext<String>)
  460. -> EventLoopFuture<GRPCStatus>
  461. ) -> ServerStreamingServerHandler<StringSerializer, StringDeserializer> {
  462. return ServerStreamingServerHandler(
  463. context: self.makeCallHandlerContext(encoding: encoding),
  464. requestDeserializer: StringDeserializer(),
  465. responseSerializer: StringSerializer(),
  466. interceptors: [],
  467. userFunction: userFunction
  468. )
  469. }
  470. private func breakOnSpaces(
  471. _ request: String,
  472. context: StreamingResponseCallContext<String>
  473. ) -> EventLoopFuture<GRPCStatus> {
  474. let parts = request.components(separatedBy: " ")
  475. context.sendResponses(parts, promise: nil)
  476. return context.eventLoop.makeSucceededFuture(.ok)
  477. }
  478. private func neverCalled(
  479. _ request: String,
  480. context: StreamingResponseCallContext<String>
  481. ) -> EventLoopFuture<GRPCStatus> {
  482. XCTFail("Unexpected invocation")
  483. return context.eventLoop.makeSucceededFuture(.processingError)
  484. }
  485. private func neverComplete(
  486. _ request: String,
  487. context: StreamingResponseCallContext<String>
  488. ) -> EventLoopFuture<GRPCStatus> {
  489. return context.eventLoop.scheduleTask(deadline: .distantFuture) {
  490. return .processingError
  491. }.futureResult
  492. }
  493. func testHappyPath() {
  494. let handler = self.makeHandler(userFunction: self.breakOnSpaces(_:context:))
  495. handler.receiveMetadata([:])
  496. assertThat(self.recorder.metadata, .is([:]))
  497. handler.receiveMessage(ByteBuffer(string: "a b"))
  498. handler.receiveEnd()
  499. handler.finish()
  500. assertThat(
  501. self.recorder.messages,
  502. .is([ByteBuffer(string: "a"), ByteBuffer(string: "b")])
  503. )
  504. assertThat(self.recorder.messageMetadata.map { $0.compress }, .is([false, false]))
  505. assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
  506. assertThat(self.recorder.trailers, .is([:]))
  507. }
  508. func testHappyPathWithCompressionEnabled() {
  509. let handler = self.makeHandler(
  510. encoding: .enabled(.init(decompressionLimit: .absolute(.max))),
  511. userFunction: self.breakOnSpaces(_:context:)
  512. )
  513. handler.receiveMetadata([:])
  514. handler.receiveMessage(ByteBuffer(string: "a"))
  515. handler.receiveEnd()
  516. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "a")))
  517. assertThat(self.recorder.messageMetadata.first?.compress, .is(true))
  518. }
  519. func testHappyPathWithCompressionEnabledButDisabledByCaller() {
  520. let handler = self.makeHandler(
  521. encoding: .enabled(.init(decompressionLimit: .absolute(.max)))
  522. ) { request, context in
  523. context.compressionEnabled = false
  524. return self.breakOnSpaces(request, context: context)
  525. }
  526. handler.receiveMetadata([:])
  527. handler.receiveMessage(ByteBuffer(string: "a"))
  528. handler.receiveEnd()
  529. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "a")))
  530. assertThat(self.recorder.messageMetadata.first?.compress, .is(false))
  531. }
  532. func testThrowingDeserializer() {
  533. let handler = ServerStreamingServerHandler(
  534. context: self.makeCallHandlerContext(),
  535. requestDeserializer: ThrowingStringDeserializer(),
  536. responseSerializer: StringSerializer(),
  537. interceptors: [],
  538. userFunction: self.neverCalled(_:context:)
  539. )
  540. handler.receiveMetadata([:])
  541. assertThat(self.recorder.metadata, .is([:]))
  542. let buffer = ByteBuffer(string: "hello")
  543. handler.receiveMessage(buffer)
  544. assertThat(self.recorder.messages, .isEmpty())
  545. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  546. }
  547. func testThrowingSerializer() {
  548. let handler = ServerStreamingServerHandler(
  549. context: self.makeCallHandlerContext(),
  550. requestDeserializer: StringDeserializer(),
  551. responseSerializer: ThrowingStringSerializer(),
  552. interceptors: [],
  553. userFunction: self.breakOnSpaces(_:context:)
  554. )
  555. handler.receiveMetadata([:])
  556. assertThat(self.recorder.metadata, .is([:]))
  557. let buffer = ByteBuffer(string: "1 2 3")
  558. handler.receiveMessage(buffer)
  559. handler.receiveEnd()
  560. assertThat(self.recorder.messages, .isEmpty())
  561. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  562. }
  563. func testUserFunctionReturnsFailedFuture() {
  564. let handler = self.makeHandler { _, context in
  565. return context.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: ":("))
  566. }
  567. handler.receiveMetadata([:])
  568. assertThat(self.recorder.metadata, .is([:]))
  569. let buffer = ByteBuffer(string: "hello")
  570. handler.receiveMessage(buffer)
  571. assertThat(self.recorder.messages, .isEmpty())
  572. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  573. assertThat(self.recorder.status?.message, .is(":("))
  574. }
  575. func testReceiveMessageBeforeHeaders() {
  576. let handler = self.makeHandler(userFunction: self.neverCalled(_:context:))
  577. handler.receiveMessage(ByteBuffer(string: "foo"))
  578. assertThat(self.recorder.metadata, .is(.nil()))
  579. assertThat(self.recorder.messages, .isEmpty())
  580. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  581. }
  582. func testReceiveMultipleHeaders() {
  583. let handler = self.makeHandler(userFunction: self.neverCalled(_:context:))
  584. handler.receiveMetadata([:])
  585. assertThat(self.recorder.metadata, .is([:]))
  586. handler.receiveMetadata([:])
  587. assertThat(self.recorder.messages, .isEmpty())
  588. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  589. }
  590. func testReceiveMultipleMessages() {
  591. let handler = self.makeHandler(userFunction: self.neverComplete(_:context:))
  592. handler.receiveMetadata([:])
  593. assertThat(self.recorder.metadata, .is([:]))
  594. let buffer = ByteBuffer(string: "hello")
  595. handler.receiveMessage(buffer)
  596. handler.receiveEnd()
  597. // Send another message before the function completes.
  598. handler.receiveMessage(buffer)
  599. assertThat(self.recorder.messages, .isEmpty())
  600. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  601. }
  602. func testFinishBeforeStarting() {
  603. let handler = self.makeHandler(userFunction: self.neverCalled(_:context:))
  604. handler.finish()
  605. assertThat(self.recorder.metadata, .is(.nil()))
  606. assertThat(self.recorder.messages, .isEmpty())
  607. assertThat(self.recorder.status, .is(.nil()))
  608. assertThat(self.recorder.trailers, .is(.nil()))
  609. }
  610. func testFinishAfterHeaders() {
  611. let handler = self.makeHandler(userFunction: self.neverCalled(_:context:))
  612. handler.receiveMetadata([:])
  613. assertThat(self.recorder.metadata, .is([:]))
  614. handler.finish()
  615. assertThat(self.recorder.messages, .isEmpty())
  616. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  617. assertThat(self.recorder.trailers, .is([:]))
  618. }
  619. func testFinishAfterMessage() {
  620. let handler = self.makeHandler(userFunction: self.neverComplete(_:context:))
  621. handler.receiveMetadata([:])
  622. handler.receiveMessage(ByteBuffer(string: "hello"))
  623. handler.finish()
  624. assertThat(self.recorder.messages, .isEmpty())
  625. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  626. assertThat(self.recorder.trailers, .is([:]))
  627. }
  628. }
  629. // MARK: - Bidirectional Streaming
  630. class BidirectionalStreamingServerHandlerTests: GRPCTestCase, ServerHandlerTestCase {
  631. let eventLoop = EmbeddedEventLoop()
  632. let allocator = ByteBufferAllocator()
  633. let recorder = ResponseRecorder()
  634. private func makeHandler(
  635. encoding: ServerMessageEncoding = .disabled,
  636. observerFactory: @escaping (StreamingResponseCallContext<String>)
  637. -> EventLoopFuture<(StreamEvent<String>) -> Void>
  638. ) -> BidirectionalStreamingServerHandler<StringSerializer, StringDeserializer> {
  639. return BidirectionalStreamingServerHandler(
  640. context: self.makeCallHandlerContext(encoding: encoding),
  641. requestDeserializer: StringDeserializer(),
  642. responseSerializer: StringSerializer(),
  643. interceptors: [],
  644. observerFactory: observerFactory
  645. )
  646. }
  647. private func echo(
  648. context: StreamingResponseCallContext<String>
  649. ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
  650. func onEvent(_ event: StreamEvent<String>) {
  651. switch event {
  652. case let .message(message):
  653. context.sendResponse(message, promise: nil)
  654. case .end:
  655. context.statusPromise.succeed(.ok)
  656. }
  657. }
  658. return context.eventLoop.makeSucceededFuture(onEvent(_:))
  659. }
  660. private func neverReceivesMessage(
  661. context: StreamingResponseCallContext<String>
  662. ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
  663. func onEvent(_ event: StreamEvent<String>) {
  664. switch event {
  665. case let .message(message):
  666. XCTFail("Unexpected message: '\(message)'")
  667. case .end:
  668. context.statusPromise.succeed(.ok)
  669. }
  670. }
  671. return context.eventLoop.makeSucceededFuture(onEvent(_:))
  672. }
  673. private func neverCalled(
  674. context: StreamingResponseCallContext<String>
  675. ) -> EventLoopFuture<(StreamEvent<String>) -> Void> {
  676. XCTFail("This observer factory should never be called")
  677. return context.eventLoop.makeFailedFuture(GRPCStatus(code: .aborted, message: nil))
  678. }
  679. func testHappyPath() {
  680. let handler = self.makeHandler(observerFactory: self.echo(context:))
  681. handler.receiveMetadata([:])
  682. assertThat(self.recorder.metadata, .is([:]))
  683. handler.receiveMessage(ByteBuffer(string: "1"))
  684. handler.receiveMessage(ByteBuffer(string: "2"))
  685. handler.receiveMessage(ByteBuffer(string: "3"))
  686. handler.receiveEnd()
  687. handler.finish()
  688. assertThat(
  689. self.recorder.messages,
  690. .is([ByteBuffer(string: "1"), ByteBuffer(string: "2"), ByteBuffer(string: "3")])
  691. )
  692. assertThat(self.recorder.messageMetadata.map { $0.compress }, .is([false, false, false]))
  693. assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
  694. assertThat(self.recorder.trailers, .is([:]))
  695. }
  696. func testHappyPathWithCompressionEnabled() {
  697. let handler = self.makeHandler(
  698. encoding: .enabled(.init(decompressionLimit: .absolute(.max))),
  699. observerFactory: self.echo(context:)
  700. )
  701. handler.receiveMetadata([:])
  702. handler.receiveMessage(ByteBuffer(string: "1"))
  703. handler.receiveMessage(ByteBuffer(string: "2"))
  704. handler.receiveMessage(ByteBuffer(string: "3"))
  705. handler.receiveEnd()
  706. assertThat(
  707. self.recorder.messages,
  708. .is([ByteBuffer(string: "1"), ByteBuffer(string: "2"), ByteBuffer(string: "3")])
  709. )
  710. assertThat(self.recorder.messageMetadata.map { $0.compress }, .is([true, true, true]))
  711. }
  712. func testHappyPathWithCompressionEnabledButDisabledByCaller() {
  713. let handler = self.makeHandler(
  714. encoding: .enabled(.init(decompressionLimit: .absolute(.max)))
  715. ) { context in
  716. context.compressionEnabled = false
  717. return self.echo(context: context)
  718. }
  719. handler.receiveMetadata([:])
  720. handler.receiveMessage(ByteBuffer(string: "1"))
  721. handler.receiveMessage(ByteBuffer(string: "2"))
  722. handler.receiveMessage(ByteBuffer(string: "3"))
  723. handler.receiveEnd()
  724. assertThat(
  725. self.recorder.messages,
  726. .is([ByteBuffer(string: "1"), ByteBuffer(string: "2"), ByteBuffer(string: "3")])
  727. )
  728. assertThat(self.recorder.messageMetadata.map { $0.compress }, .is([false, false, false]))
  729. }
  730. func testThrowingDeserializer() {
  731. let handler = BidirectionalStreamingServerHandler(
  732. context: self.makeCallHandlerContext(),
  733. requestDeserializer: ThrowingStringDeserializer(),
  734. responseSerializer: StringSerializer(),
  735. interceptors: [],
  736. observerFactory: self.neverReceivesMessage(context:)
  737. )
  738. handler.receiveMetadata([:])
  739. assertThat(self.recorder.metadata, .is([:]))
  740. let buffer = ByteBuffer(string: "hello")
  741. handler.receiveMessage(buffer)
  742. assertThat(self.recorder.messages, .isEmpty())
  743. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  744. }
  745. func testThrowingSerializer() {
  746. let handler = BidirectionalStreamingServerHandler(
  747. context: self.makeCallHandlerContext(),
  748. requestDeserializer: StringDeserializer(),
  749. responseSerializer: ThrowingStringSerializer(),
  750. interceptors: [],
  751. observerFactory: self.echo(context:)
  752. )
  753. handler.receiveMetadata([:])
  754. assertThat(self.recorder.metadata, .is([:]))
  755. let buffer = ByteBuffer(string: "hello")
  756. handler.receiveMessage(buffer)
  757. handler.receiveEnd()
  758. assertThat(self.recorder.messages, .isEmpty())
  759. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  760. }
  761. func testObserverFactoryReturnsFailedFuture() {
  762. let handler = self.makeHandler { context in
  763. context.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: ":("))
  764. }
  765. handler.receiveMetadata([:])
  766. assertThat(self.recorder.messages, .isEmpty())
  767. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  768. assertThat(self.recorder.status?.message, .is(":("))
  769. }
  770. func testDelayedObserverFactory() {
  771. let promise = self.eventLoop.makePromise(of: Void.self)
  772. let handler = self.makeHandler { context in
  773. return promise.futureResult.flatMap {
  774. self.echo(context: context)
  775. }
  776. }
  777. handler.receiveMetadata([:])
  778. // Queue up some messages.
  779. handler.receiveMessage(ByteBuffer(string: "1"))
  780. // Succeed the observer block.
  781. promise.succeed(())
  782. // A few more messages.
  783. handler.receiveMessage(ByteBuffer(string: "2"))
  784. handler.receiveEnd()
  785. assertThat(
  786. self.recorder.messages,
  787. .is([ByteBuffer(string: "1"), ByteBuffer(string: "2")])
  788. )
  789. assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
  790. }
  791. func testDelayedObserverFactoryAllMessagesBeforeSucceeding() {
  792. let promise = self.eventLoop.makePromise(of: Void.self)
  793. let handler = self.makeHandler { context in
  794. return promise.futureResult.flatMap {
  795. self.echo(context: context)
  796. }
  797. }
  798. handler.receiveMetadata([:])
  799. // Queue up some messages.
  800. handler.receiveMessage(ByteBuffer(string: "1"))
  801. handler.receiveMessage(ByteBuffer(string: "2"))
  802. handler.receiveEnd()
  803. // Succeed the observer block.
  804. promise.succeed(())
  805. assertThat(
  806. self.recorder.messages,
  807. .is([ByteBuffer(string: "1"), ByteBuffer(string: "2")])
  808. )
  809. assertThat(self.recorder.status, .notNil(.hasCode(.ok)))
  810. }
  811. func testReceiveMessageBeforeHeaders() {
  812. let handler = self.makeHandler(observerFactory: self.neverCalled(context:))
  813. handler.receiveMessage(ByteBuffer(string: "foo"))
  814. assertThat(self.recorder.metadata, .is(.nil()))
  815. assertThat(self.recorder.messages, .isEmpty())
  816. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  817. }
  818. func testReceiveMultipleHeaders() {
  819. let handler = self.makeHandler(observerFactory: self.neverReceivesMessage(context:))
  820. handler.receiveMetadata([:])
  821. assertThat(self.recorder.metadata, .is([:]))
  822. handler.receiveMetadata([:])
  823. assertThat(self.recorder.messages, .isEmpty())
  824. assertThat(self.recorder.status, .notNil(.hasCode(.internalError)))
  825. }
  826. func testFinishBeforeStarting() {
  827. let handler = self.makeHandler(observerFactory: self.neverCalled(context:))
  828. handler.finish()
  829. assertThat(self.recorder.metadata, .is(.nil()))
  830. assertThat(self.recorder.messages, .isEmpty())
  831. assertThat(self.recorder.status, .is(.nil()))
  832. assertThat(self.recorder.trailers, .is(.nil()))
  833. }
  834. func testFinishAfterHeaders() {
  835. let handler = self.makeHandler(observerFactory: self.echo(context:))
  836. handler.receiveMetadata([:])
  837. assertThat(self.recorder.metadata, .is([:]))
  838. handler.finish()
  839. assertThat(self.recorder.messages, .isEmpty())
  840. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  841. assertThat(self.recorder.trailers, .is([:]))
  842. }
  843. func testFinishAfterMessage() {
  844. let handler = self.makeHandler(observerFactory: self.echo(context:))
  845. handler.receiveMetadata([:])
  846. handler.receiveMessage(ByteBuffer(string: "hello"))
  847. handler.finish()
  848. assertThat(self.recorder.messages.first, .is(ByteBuffer(string: "hello")))
  849. assertThat(self.recorder.status, .notNil(.hasCode(.unavailable)))
  850. assertThat(self.recorder.trailers, .is([:]))
  851. }
  852. }