TestServiceProvider_NIO.swift 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import SwiftGRPCNIO
  18. import NIO
  19. /// A service prodiver for the gRPC interoperaability test suite.
  20. ///
  21. /// See: https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md#server
  22. public class TestServiceProvider_NIO: Grpc_Testing_TestServiceProvider_NIO {
  23. public init() { }
  24. private static let echoMetadataNotImplemented = GRPCStatus(
  25. code: .unimplemented,
  26. message: "Echoing metadata is not yet supported")
  27. /// Features that this server implements.
  28. ///
  29. /// Some 'features' are methods, whilst others optionally modify the outcome of those methods. The
  30. /// specification is not explicit about where these modifying features should be implemented (i.e.
  31. /// which methods should support them) and they are not listed in the individual metdod
  32. /// descriptions. As such implementation of these modifying features within each method is
  33. /// determined by the features required by each test.
  34. public static var implementedFeatures: Set<ServerFeature> {
  35. return [
  36. .emptyCall,
  37. .unaryCall,
  38. .streamingOutputCall,
  39. .streamingInputCall,
  40. .fullDuplexCall,
  41. .echoStatus
  42. ]
  43. }
  44. /// Server implements `emptyCall` which immediately returns the empty message.
  45. public func emptyCall(
  46. request: Grpc_Testing_Empty,
  47. context: StatusOnlyCallContext
  48. ) -> EventLoopFuture<Grpc_Testing_Empty> {
  49. return context.eventLoop.makeSucceededFuture(Grpc_Testing_Empty())
  50. }
  51. /// Server implements `unaryCall` which immediately returns a `SimpleResponse` with a payload
  52. /// body of size `SimpleRequest.responseSize` bytes and type as appropriate for the
  53. /// `SimpleRequest.responseType`.
  54. ///
  55. /// If the server does not support the `responseType`, then it should fail the RPC with
  56. /// `INVALID_ARGUMENT`.
  57. public func unaryCall(
  58. request: Grpc_Testing_SimpleRequest,
  59. context: StatusOnlyCallContext
  60. ) -> EventLoopFuture<Grpc_Testing_SimpleResponse> {
  61. if request.shouldEchoStatus {
  62. let code = StatusCode(rawValue: numericCast(request.responseStatus.code)) ?? .unknown
  63. return context.eventLoop.makeFailedFuture(GRPCStatus(code: code, message: request.responseStatus.message))
  64. }
  65. if context.request.headers.shouldEchoMetadata {
  66. return context.eventLoop.makeFailedFuture(TestServiceProvider_NIO.echoMetadataNotImplemented)
  67. }
  68. if case .UNRECOGNIZED = request.responseType {
  69. return context.eventLoop.makeFailedFuture(GRPCStatus(code: .invalidArgument, message: nil))
  70. }
  71. let response = Grpc_Testing_SimpleResponse.with { response in
  72. response.payload = Grpc_Testing_Payload.with { payload in
  73. payload.body = Data(repeating: 0, count: numericCast(request.responseSize))
  74. payload.type = request.responseType
  75. }
  76. }
  77. return context.eventLoop.makeSucceededFuture(response)
  78. }
  79. /// Server gets the default `SimpleRequest` proto as the request. The content of the request is
  80. /// ignored. It returns the `SimpleResponse` proto with the payload set to current timestamp.
  81. /// The timestamp is an integer representing current time with nanosecond resolution. This
  82. /// integer is formated as ASCII decimal in the response. The format is not really important as
  83. /// long as the response payload is different for each request. In addition it adds cache control
  84. /// headers such that the response can be cached by proxies in the response path. Server should
  85. /// be behind a caching proxy for this test to pass. Currently we set the max-age to 60 seconds.
  86. public func cacheableUnaryCall(
  87. request: Grpc_Testing_SimpleRequest,
  88. context: StatusOnlyCallContext
  89. ) -> EventLoopFuture<Grpc_Testing_SimpleResponse> {
  90. let status = GRPCStatus(
  91. code: .unimplemented,
  92. message: "'cacheableUnaryCall' requires control of the initial metadata which isn't supported"
  93. )
  94. return context.eventLoop.makeFailedFuture(status)
  95. }
  96. /// Server implements `streamingOutputCall` by replying, in order, with one
  97. /// `StreamingOutputCallResponse` for each `ResponseParameter`s in `StreamingOutputCallRequest`.
  98. /// Each `StreamingOutputCallResponse` should have a payload body of size `ResponseParameter.size`
  99. /// bytes, as specified by its respective `ResponseParameter`. After sending all responses, it
  100. /// closes with OK.
  101. public func streamingOutputCall(
  102. request: Grpc_Testing_StreamingOutputCallRequest,
  103. context: StreamingResponseCallContext<Grpc_Testing_StreamingOutputCallResponse>
  104. ) -> EventLoopFuture<GRPCStatus> {
  105. var responseQueue = context.eventLoop.makeSucceededFuture(())
  106. for responseParameter in request.responseParameters {
  107. responseQueue = responseQueue.flatMap {
  108. let response = Grpc_Testing_StreamingOutputCallResponse.with { response in
  109. response.payload = Grpc_Testing_Payload.with { payload in
  110. payload.body = Data(repeating: 0, count: numericCast(responseParameter.size))
  111. }
  112. }
  113. return context.sendResponse(response)
  114. }
  115. }
  116. return responseQueue.map { GRPCStatus.ok }
  117. }
  118. /// Server implements `streamingInputCall` which upon half close immediately returns a
  119. /// `StreamingInputCallResponse` where `aggregatedPayloadSize` is the sum of all request payload
  120. /// bodies received.
  121. public func streamingInputCall(
  122. context: UnaryResponseCallContext<Grpc_Testing_StreamingInputCallResponse>
  123. ) -> EventLoopFuture<(StreamEvent<Grpc_Testing_StreamingInputCallRequest>) -> Void> {
  124. var aggregatePayloadSize = 0
  125. return context.eventLoop.makeSucceededFuture({ event in
  126. switch event {
  127. case .message(let request):
  128. aggregatePayloadSize += request.payload.body.count
  129. case .end:
  130. context.responsePromise.succeed(Grpc_Testing_StreamingInputCallResponse.with { response in
  131. response.aggregatedPayloadSize = numericCast(aggregatePayloadSize)
  132. })
  133. }
  134. })
  135. }
  136. /// Server implements `fullDuplexCall` by replying, in order, with one
  137. /// `StreamingOutputCallResponse` for each `ResponseParameter`s in each
  138. /// `StreamingOutputCallRequest`. Each `StreamingOutputCallResponse` should have a payload body
  139. /// of size `ResponseParameter.size` bytes, as specified by its respective `ResponseParameter`s.
  140. /// After receiving half close and sending all responses, it closes with OK.
  141. public func fullDuplexCall(
  142. context: StreamingResponseCallContext<Grpc_Testing_StreamingOutputCallResponse>
  143. ) -> EventLoopFuture<(StreamEvent<Grpc_Testing_StreamingOutputCallRequest>) -> Void> {
  144. // We don't have support for this yet so just fail the call.
  145. if context.request.headers.shouldEchoMetadata {
  146. return context.eventLoop.makeFailedFuture(TestServiceProvider_NIO.echoMetadataNotImplemented)
  147. }
  148. var sendQueue = context.eventLoop.makeSucceededFuture(())
  149. func streamHandler(_ event: StreamEvent<Grpc_Testing_StreamingOutputCallRequest>) {
  150. switch event {
  151. case .message(let message):
  152. if message.shouldEchoStatus {
  153. let code = StatusCode(rawValue: numericCast(message.responseStatus.code))
  154. let status = GRPCStatus(code: code ?? .unknown, message: message.responseStatus.message)
  155. context.statusPromise.succeed(status)
  156. } else {
  157. for responseParameter in message.responseParameters {
  158. let response = Grpc_Testing_StreamingOutputCallResponse.with { response in
  159. response.payload = .zeros(count: numericCast(responseParameter.size))
  160. }
  161. sendQueue = sendQueue.flatMap {
  162. context.sendResponse(response)
  163. }
  164. }
  165. }
  166. case .end:
  167. sendQueue.map { GRPCStatus.ok }.cascade(to: context.statusPromise)
  168. }
  169. }
  170. return context.eventLoop.makeSucceededFuture(streamHandler(_:))
  171. }
  172. /// This is not implemented as it is not described in the specification.
  173. ///
  174. /// See: https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md
  175. public func halfDuplexCall(
  176. context: StreamingResponseCallContext<Grpc_Testing_StreamingOutputCallResponse>
  177. ) -> EventLoopFuture<(StreamEvent<Grpc_Testing_StreamingOutputCallRequest>) -> Void> {
  178. let status = GRPCStatus(
  179. code: .unimplemented,
  180. message: "'halfDuplexCall' was not described in the specification"
  181. )
  182. return context.eventLoop.makeFailedFuture(status)
  183. }
  184. }