ServerInterceptorPipelineTests.swift 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. @testable import GRPC
  17. import NIO
  18. import NIOHPACK
  19. import XCTest
  20. class ServerInterceptorPipelineTests: GRPCTestCase {
  21. override func setUp() {
  22. super.setUp()
  23. self.embeddedEventLoop = EmbeddedEventLoop()
  24. }
  25. private var embeddedEventLoop: EmbeddedEventLoop!
  26. private func makePipeline<Request, Response>(
  27. requests: Request.Type = Request.self,
  28. responses: Response.Type = Response.self,
  29. path: String = "/foo/bar",
  30. callType: GRPCCallType = .unary,
  31. interceptors: [ServerInterceptor<Request, Response>] = [],
  32. onRequestPart: @escaping (GRPCServerRequestPart<Request>) -> Void,
  33. onResponsePart: @escaping (GRPCServerResponsePart<Response>, EventLoopPromise<Void>?) -> Void
  34. ) -> ServerInterceptorPipeline<Request, Response> {
  35. return ServerInterceptorPipeline(
  36. logger: self.logger,
  37. eventLoop: self.embeddedEventLoop,
  38. path: path,
  39. callType: callType,
  40. interceptors: interceptors,
  41. onRequestPart: onRequestPart,
  42. onResponsePart: onResponsePart
  43. )
  44. }
  45. func testEmptyPipeline() {
  46. var requestParts: [GRPCServerRequestPart<String>] = []
  47. var responseParts: [GRPCServerResponsePart<String>] = []
  48. let pipeline = self.makePipeline(
  49. requests: String.self,
  50. responses: String.self,
  51. onRequestPart: { requestParts.append($0) },
  52. onResponsePart: { part, promise in
  53. responseParts.append(part)
  54. assertThat(promise, .is(.nil()))
  55. }
  56. )
  57. pipeline.receive(.metadata([:]))
  58. pipeline.receive(.message("foo"))
  59. pipeline.receive(.end)
  60. assertThat(requestParts, .hasCount(3))
  61. assertThat(requestParts[0].metadata, .is([:]))
  62. assertThat(requestParts[1].message, .is("foo"))
  63. assertThat(requestParts[2].isEnd, .is(true))
  64. pipeline.send(.metadata([:]), promise: nil)
  65. pipeline.send(.message("bar", .init(compress: false, flush: false)), promise: nil)
  66. pipeline.send(.end(.ok, [:]), promise: nil)
  67. assertThat(responseParts, .hasCount(3))
  68. assertThat(responseParts[0].metadata, .is([:]))
  69. assertThat(responseParts[1].message, .is("bar"))
  70. assertThat(responseParts[2].end, .is(.notNil()))
  71. // Pipelines should now be closed. We can't send or receive.
  72. let p = self.embeddedEventLoop.makePromise(of: Void.self)
  73. pipeline.send(.metadata([:]), promise: p)
  74. assertThat(try p.futureResult.wait(), .throws(.instanceOf(GRPCError.AlreadyComplete.self)))
  75. responseParts.removeAll()
  76. pipeline.receive(.end)
  77. assertThat(responseParts, .isEmpty())
  78. }
  79. func testRecordingPipeline() {
  80. let recorder = RecordingServerInterceptor<String, String>()
  81. let pipeline = self.makePipeline(
  82. interceptors: [recorder],
  83. onRequestPart: { _ in },
  84. onResponsePart: { _, _ in }
  85. )
  86. pipeline.receive(.metadata([:]))
  87. pipeline.receive(.message("foo"))
  88. pipeline.receive(.end)
  89. pipeline.send(.metadata([:]), promise: nil)
  90. pipeline.send(.message("bar", .init(compress: false, flush: false)), promise: nil)
  91. pipeline.send(.end(.ok, [:]), promise: nil)
  92. // Check the request parts are there.
  93. assertThat(recorder.requestParts, .hasCount(3))
  94. assertThat(recorder.requestParts[0].metadata, .is(.notNil()))
  95. assertThat(recorder.requestParts[1].message, .is(.notNil()))
  96. assertThat(recorder.requestParts[2].isEnd, .is(true))
  97. // Check the response parts are there.
  98. assertThat(recorder.responseParts, .hasCount(3))
  99. assertThat(recorder.responseParts[0].metadata, .is(.notNil()))
  100. assertThat(recorder.responseParts[1].message, .is(.notNil()))
  101. assertThat(recorder.responseParts[2].end, .is(.notNil()))
  102. }
  103. }
  104. internal class RecordingServerInterceptor<Request, Response>:
  105. ServerInterceptor<Request, Response> {
  106. var requestParts: [GRPCServerRequestPart<Request>] = []
  107. var responseParts: [GRPCServerResponsePart<Response>] = []
  108. override func receive(
  109. _ part: GRPCServerRequestPart<Request>,
  110. context: ServerInterceptorContext<Request, Response>
  111. ) {
  112. self.requestParts.append(part)
  113. context.receive(part)
  114. }
  115. override func send(
  116. _ part: GRPCServerResponsePart<Response>,
  117. promise: EventLoopPromise<Void>?,
  118. context: ServerInterceptorContext<Request, Response>
  119. ) {
  120. self.responseParts.append(part)
  121. context.send(part, promise: promise)
  122. }
  123. }
  124. extension GRPCServerRequestPart {
  125. var metadata: HPACKHeaders? {
  126. switch self {
  127. case let .metadata(metadata):
  128. return metadata
  129. default:
  130. return nil
  131. }
  132. }
  133. var message: Request? {
  134. switch self {
  135. case let .message(message):
  136. return message
  137. default:
  138. return nil
  139. }
  140. }
  141. var isEnd: Bool {
  142. switch self {
  143. case .end:
  144. return true
  145. default:
  146. return false
  147. }
  148. }
  149. }
  150. extension GRPCServerResponsePart {
  151. var metadata: HPACKHeaders? {
  152. switch self {
  153. case let .metadata(metadata):
  154. return metadata
  155. default:
  156. return nil
  157. }
  158. }
  159. var message: Response? {
  160. switch self {
  161. case let .message(message, _):
  162. return message
  163. default:
  164. return nil
  165. }
  166. }
  167. var end: (GRPCStatus, HPACKHeaders)? {
  168. switch self {
  169. case let .end(status, trailers):
  170. return (status, trailers)
  171. default:
  172. return nil
  173. }
  174. }
  175. }