ServerInterceptorPipelineTests.swift 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOConcurrencyHelpers
  17. import NIOCore
  18. import NIOEmbedded
  19. import NIOHPACK
  20. import XCTest
  21. @testable import GRPC
  22. class ServerInterceptorPipelineTests: GRPCTestCase {
  23. override func setUp() {
  24. super.setUp()
  25. self.embeddedEventLoop = EmbeddedEventLoop()
  26. }
  27. private var embeddedEventLoop: EmbeddedEventLoop!
  28. private func makePipeline<Request, Response>(
  29. requests: Request.Type = Request.self,
  30. responses: Response.Type = Response.self,
  31. path: String = "/foo/bar",
  32. callType: GRPCCallType = .unary,
  33. interceptors: [ServerInterceptor<Request, Response>] = [],
  34. onRequestPart: @escaping (GRPCServerRequestPart<Request>) -> Void,
  35. onResponsePart: @escaping (GRPCServerResponsePart<Response>, EventLoopPromise<Void>?) -> Void
  36. ) -> ServerInterceptorPipeline<Request, Response> {
  37. return ServerInterceptorPipeline(
  38. logger: self.logger,
  39. eventLoop: self.embeddedEventLoop,
  40. path: path,
  41. callType: callType,
  42. remoteAddress: nil,
  43. userInfoRef: Ref(UserInfo()),
  44. closeFuture: self.embeddedEventLoop.makeSucceededVoidFuture(),
  45. interceptors: interceptors,
  46. onRequestPart: onRequestPart,
  47. onResponsePart: onResponsePart
  48. )
  49. }
  50. func testEmptyPipeline() {
  51. var requestParts: [GRPCServerRequestPart<String>] = []
  52. var responseParts: [GRPCServerResponsePart<String>] = []
  53. let pipeline = self.makePipeline(
  54. requests: String.self,
  55. responses: String.self,
  56. onRequestPart: { requestParts.append($0) },
  57. onResponsePart: { part, promise in
  58. responseParts.append(part)
  59. assertThat(promise, .is(.none()))
  60. }
  61. )
  62. pipeline.receive(.metadata([:]))
  63. pipeline.receive(.message("foo"))
  64. pipeline.receive(.end)
  65. assertThat(requestParts, .hasCount(3))
  66. assertThat(requestParts[0].metadata, .is([:]))
  67. assertThat(requestParts[1].message, .is("foo"))
  68. assertThat(requestParts[2].isEnd, .is(true))
  69. pipeline.send(.metadata([:]), promise: nil)
  70. pipeline.send(.message("bar", .init(compress: false, flush: false)), promise: nil)
  71. pipeline.send(.end(.ok, [:]), promise: nil)
  72. assertThat(responseParts, .hasCount(3))
  73. assertThat(responseParts[0].metadata, .is([:]))
  74. assertThat(responseParts[1].message, .is("bar"))
  75. assertThat(responseParts[2].end, .is(.some()))
  76. // Pipelines should now be closed. We can't send or receive.
  77. let p = self.embeddedEventLoop.makePromise(of: Void.self)
  78. pipeline.send(.metadata([:]), promise: p)
  79. assertThat(try p.futureResult.wait(), .throws(.instanceOf(GRPCError.AlreadyComplete.self)))
  80. responseParts.removeAll()
  81. pipeline.receive(.end)
  82. assertThat(responseParts, .isEmpty())
  83. }
  84. func testRecordingPipeline() {
  85. let recorder = RecordingServerInterceptor<String, String>()
  86. let pipeline = self.makePipeline(
  87. interceptors: [recorder],
  88. onRequestPart: { _ in },
  89. onResponsePart: { _, _ in }
  90. )
  91. pipeline.receive(.metadata([:]))
  92. pipeline.receive(.message("foo"))
  93. pipeline.receive(.end)
  94. pipeline.send(.metadata([:]), promise: nil)
  95. pipeline.send(.message("bar", .init(compress: false, flush: false)), promise: nil)
  96. pipeline.send(.end(.ok, [:]), promise: nil)
  97. // Check the request parts are there.
  98. assertThat(recorder.requestParts, .hasCount(3))
  99. assertThat(recorder.requestParts[0].metadata, .is(.some()))
  100. assertThat(recorder.requestParts[1].message, .is(.some()))
  101. assertThat(recorder.requestParts[2].isEnd, .is(true))
  102. // Check the response parts are there.
  103. assertThat(recorder.responseParts, .hasCount(3))
  104. assertThat(recorder.responseParts[0].metadata, .is(.some()))
  105. assertThat(recorder.responseParts[1].message, .is(.some()))
  106. assertThat(recorder.responseParts[2].end, .is(.some()))
  107. }
  108. }
  109. internal class RecordingServerInterceptor<Request, Response>:
  110. ServerInterceptor<Request, Response>, @unchecked Sendable
  111. {
  112. private let lock = NIOLock()
  113. private var _requestParts: [GRPCServerRequestPart<Request>] = []
  114. private var _responseParts: [GRPCServerResponsePart<Response>] = []
  115. var requestParts: [GRPCServerRequestPart<Request>] {
  116. self.lock.withLock { self._requestParts }
  117. }
  118. var responseParts: [GRPCServerResponsePart<Response>] {
  119. self.lock.withLock { self._responseParts }
  120. }
  121. override func receive(
  122. _ part: GRPCServerRequestPart<Request>,
  123. context: ServerInterceptorContext<Request, Response>
  124. ) {
  125. self.lock.withLock { self._requestParts.append(part) }
  126. context.receive(part)
  127. }
  128. override func send(
  129. _ part: GRPCServerResponsePart<Response>,
  130. promise: EventLoopPromise<Void>?,
  131. context: ServerInterceptorContext<Request, Response>
  132. ) {
  133. self.lock.withLock { self._responseParts.append(part) }
  134. context.send(part, promise: promise)
  135. }
  136. }
  137. extension GRPCServerRequestPart {
  138. var metadata: HPACKHeaders? {
  139. switch self {
  140. case let .metadata(metadata):
  141. return metadata
  142. default:
  143. return nil
  144. }
  145. }
  146. var message: Request? {
  147. switch self {
  148. case let .message(message):
  149. return message
  150. default:
  151. return nil
  152. }
  153. }
  154. var isEnd: Bool {
  155. switch self {
  156. case .end:
  157. return true
  158. default:
  159. return false
  160. }
  161. }
  162. }
  163. extension GRPCServerResponsePart {
  164. var metadata: HPACKHeaders? {
  165. switch self {
  166. case let .metadata(metadata):
  167. return metadata
  168. default:
  169. return nil
  170. }
  171. }
  172. var message: Response? {
  173. switch self {
  174. case let .message(message, _):
  175. return message
  176. default:
  177. return nil
  178. }
  179. }
  180. var end: (GRPCStatus, HPACKHeaders)? {
  181. switch self {
  182. case let .end(status, trailers):
  183. return (status, trailers)
  184. default:
  185. return nil
  186. }
  187. }
  188. }