StreamingResponseCallContext.swift 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOHPACK
  20. import NIOHTTP1
  21. import SwiftProtobuf
  22. /// Abstract base class exposing a method to send multiple messages over the wire and a promise for the final RPC status.
  23. ///
  24. /// - When `statusPromise` is fulfilled, the call is closed and the provided status transmitted.
  25. /// - If `statusPromise` is failed and the error is of type `GRPCStatusTransformable`,
  26. /// the result of `error.asGRPCStatus()` will be returned to the client.
  27. /// - If `error.asGRPCStatus()` is not available, `GRPCStatus.processingError` is returned to the client.
  28. open class StreamingResponseCallContext<ResponsePayload>: ServerCallContextBase {
  29. typealias WrappedResponse = _GRPCServerResponsePart<ResponsePayload>
  30. public let statusPromise: EventLoopPromise<GRPCStatus>
  31. public convenience init(
  32. eventLoop: EventLoop,
  33. headers: HPACKHeaders,
  34. logger: Logger,
  35. userInfo: UserInfo = UserInfo()
  36. ) {
  37. self.init(eventLoop: eventLoop, headers: headers, logger: logger, userInfoRef: .init(userInfo))
  38. }
  39. override internal init(
  40. eventLoop: EventLoop,
  41. headers: HPACKHeaders,
  42. logger: Logger,
  43. userInfoRef: Ref<UserInfo>
  44. ) {
  45. self.statusPromise = eventLoop.makePromise()
  46. super.init(eventLoop: eventLoop, headers: headers, logger: logger, userInfoRef: userInfoRef)
  47. }
  48. @available(*, deprecated, renamed: "init(eventLoop:path:headers:logger:userInfo:)")
  49. override public init(eventLoop: EventLoop, request: HTTPRequestHead, logger: Logger) {
  50. self.statusPromise = eventLoop.makePromise()
  51. super.init(eventLoop: eventLoop, request: request, logger: logger)
  52. }
  53. /// Send a response to the client.
  54. ///
  55. /// - Parameters:
  56. /// - message: The message to send to the client.
  57. /// - compression: Whether compression should be used for this response. If compression
  58. /// is enabled in the call context, the value passed here takes precedence. Defaults to
  59. /// deferring to the value set on the call context.
  60. /// - promise: A promise to complete once the message has been sent.
  61. open func sendResponse(
  62. _ message: ResponsePayload,
  63. compression: Compression = .deferToCallDefault,
  64. promise: EventLoopPromise<Void>?
  65. ) {
  66. fatalError("needs to be overridden")
  67. }
  68. /// Send a response to the client.
  69. ///
  70. /// - Parameters:
  71. /// - message: The message to send to the client.
  72. /// - compression: Whether compression should be used for this response. If compression
  73. /// is enabled in the call context, the value passed here takes precedence. Defaults to
  74. /// deferring to the value set on the call context.
  75. open func sendResponse(
  76. _ message: ResponsePayload,
  77. compression: Compression = .deferToCallDefault
  78. ) -> EventLoopFuture<Void> {
  79. let promise = self.eventLoop.makePromise(of: Void.self)
  80. self.sendResponse(message, compression: compression, promise: promise)
  81. return promise.futureResult
  82. }
  83. /// Sends a sequence of responses to the client.
  84. /// - Parameters:
  85. /// - messages: The messages to send to the client.
  86. /// - compression: Whether compression should be used for this response. If compression
  87. /// is enabled in the call context, the value passed here takes precedence. Defaults to
  88. /// deferring to the value set on the call context.
  89. /// - promise: A promise to complete once the messages have been sent.
  90. open func sendResponses<Messages: Sequence>(
  91. _ messages: Messages,
  92. compression: Compression = .deferToCallDefault,
  93. promise: EventLoopPromise<Void>?
  94. ) where Messages.Element == ResponsePayload {
  95. fatalError("needs to be overridden")
  96. }
  97. /// Sends a sequence of responses to the client.
  98. /// - Parameters:
  99. /// - messages: The messages to send to the client.
  100. /// - compression: Whether compression should be used for this response. If compression
  101. /// is enabled in the call context, the value passed here takes precedence. Defaults to
  102. /// deferring to the value set on the call context.
  103. open func sendResponses<Messages: Sequence>(
  104. _ messages: Messages,
  105. compression: Compression = .deferToCallDefault
  106. ) -> EventLoopFuture<Void> where Messages.Element == ResponsePayload {
  107. let promise = self.eventLoop.makePromise(of: Void.self)
  108. self.sendResponses(messages, compression: compression, promise: promise)
  109. return promise.futureResult
  110. }
  111. }
  112. internal final class _StreamingResponseCallContext<Request, Response>:
  113. StreamingResponseCallContext<Response> {
  114. private let _sendResponse: (Response, MessageMetadata, EventLoopPromise<Void>?) -> Void
  115. internal init(
  116. eventLoop: EventLoop,
  117. headers: HPACKHeaders,
  118. logger: Logger,
  119. userInfoRef: Ref<UserInfo>,
  120. sendResponse: @escaping (Response, MessageMetadata, EventLoopPromise<Void>?) -> Void
  121. ) {
  122. self._sendResponse = sendResponse
  123. super.init(eventLoop: eventLoop, headers: headers, logger: logger, userInfoRef: userInfoRef)
  124. }
  125. override func sendResponse(
  126. _ message: Response,
  127. compression: Compression = .deferToCallDefault,
  128. promise: EventLoopPromise<Void>?
  129. ) {
  130. let compress = compression.isEnabled(callDefault: self.compressionEnabled)
  131. self._sendResponse(message, .init(compress: compress, flush: true), promise)
  132. }
  133. override func sendResponses<Messages: Sequence>(
  134. _ messages: Messages,
  135. compression: Compression = .deferToCallDefault,
  136. promise: EventLoopPromise<Void>?
  137. ) where Response == Messages.Element {
  138. let compress = compression.isEnabled(callDefault: self.compressionEnabled)
  139. var iterator = messages.makeIterator()
  140. var next = iterator.next()
  141. while let current = next {
  142. next = iterator.next()
  143. // Attach the promise, if present, to the last message.
  144. let isLast = next == nil
  145. self._sendResponse(current, .init(compress: compress, flush: isLast), isLast ? promise : nil)
  146. }
  147. }
  148. }
  149. /// Concrete implementation of `StreamingResponseCallContext` used by our generated code.
  150. open class StreamingResponseCallContextImpl<ResponsePayload>: StreamingResponseCallContext<ResponsePayload> {
  151. public let channel: Channel
  152. /// - Parameters:
  153. /// - channel: The NIO channel the call is handled on.
  154. /// - headers: The headers provided with this call.
  155. /// - errorDelegate: Provides a means for transforming status promise failures to `GRPCStatusTransformable` before
  156. /// sending them to the client.
  157. /// - logger: A logger.
  158. ///
  159. /// Note: `errorDelegate` is not called for status promise that are `succeeded` with a non-OK status.
  160. public init(
  161. channel: Channel,
  162. headers: HPACKHeaders,
  163. errorDelegate: ServerErrorDelegate?,
  164. logger: Logger
  165. ) {
  166. self.channel = channel
  167. super.init(
  168. eventLoop: channel.eventLoop,
  169. headers: headers,
  170. logger: logger,
  171. userInfoRef: Ref(UserInfo())
  172. )
  173. self.statusPromise.futureResult.whenComplete { result in
  174. switch result {
  175. case let .success(status):
  176. self.channel.writeAndFlush(
  177. self.wrap(.statusAndTrailers(status, self.trailers)),
  178. promise: nil
  179. )
  180. case let .failure(error):
  181. let (status, trailers) = self.processObserverError(error, delegate: errorDelegate)
  182. self.channel.writeAndFlush(self.wrap(.statusAndTrailers(status, trailers)), promise: nil)
  183. }
  184. }
  185. }
  186. /// Wrap the response part in a `NIOAny`. This is useful in order to avoid explicitly spelling
  187. /// out `NIOAny(WrappedResponse(...))`.
  188. private func wrap(_ response: WrappedResponse) -> NIOAny {
  189. return NIOAny(response)
  190. }
  191. @available(*, deprecated, renamed: "init(channel:headers:errorDelegate:logger:)")
  192. public convenience init(
  193. channel: Channel,
  194. request: HTTPRequestHead,
  195. errorDelegate: ServerErrorDelegate?,
  196. logger: Logger
  197. ) {
  198. self.init(
  199. channel: channel,
  200. headers: HPACKHeaders(httpHeaders: request.headers, normalizeHTTPHeaders: false),
  201. errorDelegate: errorDelegate,
  202. logger: logger
  203. )
  204. }
  205. override open func sendResponse(
  206. _ message: ResponsePayload,
  207. compression: Compression = .deferToCallDefault,
  208. promise: EventLoopPromise<Void>?
  209. ) {
  210. let response = _MessageContext(
  211. message,
  212. compressed: compression.isEnabled(callDefault: self.compressionEnabled)
  213. )
  214. self.channel.writeAndFlush(self.wrap(.message(response)), promise: promise)
  215. }
  216. override open func sendResponses<Messages: Sequence>(
  217. _ messages: Messages,
  218. compression: Compression = .deferToCallDefault,
  219. promise: EventLoopPromise<Void>?
  220. ) where ResponsePayload == Messages.Element {
  221. let compress = compression.isEnabled(callDefault: self.compressionEnabled)
  222. var iterator = messages.makeIterator()
  223. var next = iterator.next()
  224. while let current = next {
  225. next = iterator.next()
  226. // Attach the promise, if present, to the last message.
  227. let isLast = next == nil
  228. self.channel.write(
  229. self.wrap(.message(.init(current, compressed: compress))),
  230. promise: isLast ? promise : nil
  231. )
  232. }
  233. self.channel.flush()
  234. }
  235. }
  236. /// Concrete implementation of `StreamingResponseCallContext` used for testing.
  237. ///
  238. /// Simply records all sent messages.
  239. open class StreamingResponseCallContextTestStub<ResponsePayload>: StreamingResponseCallContext<ResponsePayload> {
  240. open var recordedResponses: [ResponsePayload] = []
  241. override open func sendResponse(
  242. _ message: ResponsePayload,
  243. compression: Compression = .deferToCallDefault,
  244. promise: EventLoopPromise<Void>?
  245. ) {
  246. self.recordedResponses.append(message)
  247. promise?.succeed(())
  248. }
  249. override open func sendResponses<Messages: Sequence>(
  250. _ messages: Messages,
  251. compression: Compression = .deferToCallDefault,
  252. promise: EventLoopPromise<Void>?
  253. ) where ResponsePayload == Messages.Element {
  254. self.recordedResponses.append(contentsOf: messages)
  255. promise?.succeed(())
  256. }
  257. }