TestServiceProvider.swift 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPC
  17. import GRPCInteroperabilityTestModels
  18. import NIOCore
  19. import struct Foundation.Data
  20. /// A service provider for the gRPC interoperability test suite.
  21. ///
  22. /// See: https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md#server
  23. public class TestServiceProvider: Grpc_Testing_TestServiceProvider {
  24. public var interceptors: Grpc_Testing_TestServiceServerInterceptorFactoryProtocol?
  25. public init() {}
  26. private static let echoMetadataNotImplemented = GRPCStatus(
  27. code: .unimplemented,
  28. message: "Echoing metadata is not yet supported"
  29. )
  30. /// Features that this server implements.
  31. ///
  32. /// Some 'features' are methods, whilst others optionally modify the outcome of those methods. The
  33. /// specification is not explicit about where these modifying features should be implemented (i.e.
  34. /// which methods should support them) and they are not listed in the individual method
  35. /// descriptions. As such implementation of these modifying features within each method is
  36. /// determined by the features required by each test.
  37. public static var implementedFeatures: Set<ServerFeature> {
  38. return [
  39. .emptyCall,
  40. .unaryCall,
  41. .streamingOutputCall,
  42. .streamingInputCall,
  43. .fullDuplexCall,
  44. .echoStatus,
  45. .compressedResponse,
  46. .compressedRequest,
  47. ]
  48. }
  49. /// Server implements `emptyCall` which immediately returns the empty message.
  50. public func emptyCall(
  51. request: Grpc_Testing_Empty,
  52. context: StatusOnlyCallContext
  53. ) -> EventLoopFuture<Grpc_Testing_Empty> {
  54. return context.eventLoop.makeSucceededFuture(Grpc_Testing_Empty())
  55. }
  56. /// Server implements `unaryCall` which immediately returns a `SimpleResponse` with a payload
  57. /// body of size `SimpleRequest.responseSize` bytes and type as appropriate for the
  58. /// `SimpleRequest.responseType`.
  59. ///
  60. /// If the server does not support the `responseType`, then it should fail the RPC with
  61. /// `INVALID_ARGUMENT`.
  62. public func unaryCall(
  63. request: Grpc_Testing_SimpleRequest,
  64. context: StatusOnlyCallContext
  65. ) -> EventLoopFuture<Grpc_Testing_SimpleResponse> {
  66. // We can't validate messages at the wire-encoding layer (i.e. where the compression byte is
  67. // set), so we have to check via the encoding header. Note that it is possible for the header
  68. // to be set and for the message to not be compressed.
  69. if request.expectCompressed.value, !context.headers.contains(name: "grpc-encoding") {
  70. let status = GRPCStatus(
  71. code: .invalidArgument,
  72. message: "Expected compressed request, but 'grpc-encoding' was missing"
  73. )
  74. return context.eventLoop.makeFailedFuture(status)
  75. }
  76. // Should we enable compression? The C++ interoperability client only expects compression if
  77. // explicitly requested; we'll do the same.
  78. context.compressionEnabled = request.responseCompressed.value
  79. if request.shouldEchoStatus {
  80. let code = GRPCStatus.Code(rawValue: numericCast(request.responseStatus.code)) ?? .unknown
  81. return context.eventLoop
  82. .makeFailedFuture(GRPCStatus(code: code, message: request.responseStatus.message))
  83. }
  84. if context.headers.shouldEchoMetadata {
  85. return context.eventLoop.makeFailedFuture(TestServiceProvider.echoMetadataNotImplemented)
  86. }
  87. if case .UNRECOGNIZED = request.responseType {
  88. return context.eventLoop.makeFailedFuture(GRPCStatus(code: .invalidArgument, message: nil))
  89. }
  90. let response = Grpc_Testing_SimpleResponse.with { response in
  91. response.payload = Grpc_Testing_Payload.with { payload in
  92. payload.body = Data(repeating: 0, count: numericCast(request.responseSize))
  93. payload.type = request.responseType
  94. }
  95. }
  96. return context.eventLoop.makeSucceededFuture(response)
  97. }
  98. /// Server gets the default `SimpleRequest` proto as the request. The content of the request is
  99. /// ignored. It returns the `SimpleResponse` proto with the payload set to current timestamp.
  100. /// The timestamp is an integer representing current time with nanosecond resolution. This
  101. /// integer is formated as ASCII decimal in the response. The format is not really important as
  102. /// long as the response payload is different for each request. In addition it adds cache control
  103. /// headers such that the response can be cached by proxies in the response path. Server should
  104. /// be behind a caching proxy for this test to pass. Currently we set the max-age to 60 seconds.
  105. public func cacheableUnaryCall(
  106. request: Grpc_Testing_SimpleRequest,
  107. context: StatusOnlyCallContext
  108. ) -> EventLoopFuture<Grpc_Testing_SimpleResponse> {
  109. let status = GRPCStatus(
  110. code: .unimplemented,
  111. message: "'cacheableUnaryCall' requires control of the initial metadata which isn't supported"
  112. )
  113. return context.eventLoop.makeFailedFuture(status)
  114. }
  115. /// Server implements `streamingOutputCall` by replying, in order, with one
  116. /// `StreamingOutputCallResponse` for each `ResponseParameter`s in `StreamingOutputCallRequest`.
  117. /// Each `StreamingOutputCallResponse` should have a payload body of size `ResponseParameter.size`
  118. /// bytes, as specified by its respective `ResponseParameter`. After sending all responses, it
  119. /// closes with OK.
  120. public func streamingOutputCall(
  121. request: Grpc_Testing_StreamingOutputCallRequest,
  122. context: StreamingResponseCallContext<Grpc_Testing_StreamingOutputCallResponse>
  123. ) -> EventLoopFuture<GRPCStatus> {
  124. var responseQueue = context.eventLoop.makeSucceededFuture(())
  125. for responseParameter in request.responseParameters {
  126. responseQueue = responseQueue.flatMap {
  127. let response = Grpc_Testing_StreamingOutputCallResponse.with { response in
  128. response.payload = Grpc_Testing_Payload.with { payload in
  129. payload.body = Data(repeating: 0, count: numericCast(responseParameter.size))
  130. }
  131. }
  132. // Should we enable compression? The C++ interoperability client only expects compression if
  133. // explicitly requested; we'll do the same.
  134. let compression: Compression = responseParameter.compressed.value ? .enabled : .disabled
  135. return context.sendResponse(response, compression: compression)
  136. }
  137. }
  138. return responseQueue.map { GRPCStatus.ok }
  139. }
  140. /// Server implements `streamingInputCall` which upon half close immediately returns a
  141. /// `StreamingInputCallResponse` where `aggregatedPayloadSize` is the sum of all request payload
  142. /// bodies received.
  143. public func streamingInputCall(
  144. context: UnaryResponseCallContext<Grpc_Testing_StreamingInputCallResponse>
  145. ) -> EventLoopFuture<(StreamEvent<Grpc_Testing_StreamingInputCallRequest>) -> Void> {
  146. var aggregatePayloadSize = 0
  147. return context.eventLoop.makeSucceededFuture({ event in
  148. switch event {
  149. case let .message(request):
  150. if request.expectCompressed.value,
  151. !context.headers.contains(name: "grpc-encoding")
  152. {
  153. context.responseStatus = GRPCStatus(
  154. code: .invalidArgument,
  155. message: "Expected compressed request, but 'grpc-encoding' was missing"
  156. )
  157. context.responsePromise.fail(context.responseStatus)
  158. } else {
  159. aggregatePayloadSize += request.payload.body.count
  160. }
  161. case .end:
  162. context.responsePromise.succeed(
  163. Grpc_Testing_StreamingInputCallResponse.with { response in
  164. response.aggregatedPayloadSize = numericCast(aggregatePayloadSize)
  165. }
  166. )
  167. }
  168. })
  169. }
  170. /// Server implements `fullDuplexCall` by replying, in order, with one
  171. /// `StreamingOutputCallResponse` for each `ResponseParameter`s in each
  172. /// `StreamingOutputCallRequest`. Each `StreamingOutputCallResponse` should have a payload body
  173. /// of size `ResponseParameter.size` bytes, as specified by its respective `ResponseParameter`s.
  174. /// After receiving half close and sending all responses, it closes with OK.
  175. public func fullDuplexCall(
  176. context: StreamingResponseCallContext<Grpc_Testing_StreamingOutputCallResponse>
  177. ) -> EventLoopFuture<(StreamEvent<Grpc_Testing_StreamingOutputCallRequest>) -> Void> {
  178. // We don't have support for this yet so just fail the call.
  179. if context.headers.shouldEchoMetadata {
  180. return context.eventLoop.makeFailedFuture(TestServiceProvider.echoMetadataNotImplemented)
  181. }
  182. var sendQueue = context.eventLoop.makeSucceededFuture(())
  183. func streamHandler(_ event: StreamEvent<Grpc_Testing_StreamingOutputCallRequest>) {
  184. switch event {
  185. case let .message(message):
  186. if message.shouldEchoStatus {
  187. let code = GRPCStatus.Code(rawValue: numericCast(message.responseStatus.code))
  188. let status = GRPCStatus(code: code ?? .unknown, message: message.responseStatus.message)
  189. context.statusPromise.succeed(status)
  190. } else {
  191. for responseParameter in message.responseParameters {
  192. let response = Grpc_Testing_StreamingOutputCallResponse.with { response in
  193. response.payload = .zeros(count: numericCast(responseParameter.size))
  194. }
  195. sendQueue = sendQueue.flatMap {
  196. context.sendResponse(response)
  197. }
  198. }
  199. }
  200. case .end:
  201. sendQueue.map { GRPCStatus.ok }.cascade(to: context.statusPromise)
  202. }
  203. }
  204. return context.eventLoop.makeSucceededFuture(streamHandler(_:))
  205. }
  206. /// This is not implemented as it is not described in the specification.
  207. ///
  208. /// See: https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md
  209. public func halfDuplexCall(
  210. context: StreamingResponseCallContext<Grpc_Testing_StreamingOutputCallResponse>
  211. ) -> EventLoopFuture<(StreamEvent<Grpc_Testing_StreamingOutputCallRequest>) -> Void> {
  212. let status = GRPCStatus(
  213. code: .unimplemented,
  214. message: "'halfDuplexCall' was not described in the specification"
  215. )
  216. return context.eventLoop.makeFailedFuture(status)
  217. }
  218. }