StreamingResponseCallContext.swift 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIOCore
  19. import NIOHPACK
  20. import NIOHTTP1
  21. import SwiftProtobuf
  22. /// An abstract base class for a context provided to handlers for RPCs which may return multiple
  23. /// responses, i.e. server streaming and bidirectional streaming RPCs.
  24. open class StreamingResponseCallContext<ResponsePayload>: ServerCallContextBase {
  25. /// A promise for the ``GRPCStatus``, the end of the response stream. This must be completed by
  26. /// bidirectional streaming RPC handlers to end the RPC.
  27. ///
  28. /// Note that while this is also present for server streaming RPCs, it is not necessary to
  29. /// complete this promise: instead, an `EventLoopFuture<GRPCStatus>` must be returned from the
  30. /// handler.
  31. public let statusPromise: EventLoopPromise<GRPCStatus>
  32. @available(*, deprecated, renamed: "init(eventLoop:headers:logger:userInfo:closeFuture:)")
  33. public convenience init(
  34. eventLoop: EventLoop,
  35. headers: HPACKHeaders,
  36. logger: Logger,
  37. userInfo: UserInfo = UserInfo()
  38. ) {
  39. self.init(
  40. eventLoop: eventLoop,
  41. headers: headers,
  42. logger: logger,
  43. userInfoRef: .init(userInfo),
  44. closeFuture: eventLoop.makeFailedFuture(GRPCStatus.closeFutureNotImplemented)
  45. )
  46. }
  47. public convenience init(
  48. eventLoop: EventLoop,
  49. headers: HPACKHeaders,
  50. logger: Logger,
  51. userInfo: UserInfo = UserInfo(),
  52. closeFuture: EventLoopFuture<Void>
  53. ) {
  54. self.init(
  55. eventLoop: eventLoop,
  56. headers: headers,
  57. logger: logger,
  58. userInfoRef: .init(userInfo),
  59. closeFuture: closeFuture
  60. )
  61. }
  62. @inlinable
  63. override internal init(
  64. eventLoop: EventLoop,
  65. headers: HPACKHeaders,
  66. logger: Logger,
  67. userInfoRef: Ref<UserInfo>,
  68. closeFuture: EventLoopFuture<Void>
  69. ) {
  70. self.statusPromise = eventLoop.makePromise()
  71. super.init(
  72. eventLoop: eventLoop,
  73. headers: headers,
  74. logger: logger,
  75. userInfoRef: userInfoRef,
  76. closeFuture: closeFuture
  77. )
  78. }
  79. /// Send a response to the client.
  80. ///
  81. /// - Parameters:
  82. /// - message: The message to send to the client.
  83. /// - compression: Whether compression should be used for this response. If compression
  84. /// is enabled in the call context, the value passed here takes precedence. Defaults to
  85. /// deferring to the value set on the call context.
  86. /// - promise: A promise to complete once the message has been sent.
  87. open func sendResponse(
  88. _ message: ResponsePayload,
  89. compression: Compression = .deferToCallDefault,
  90. promise: EventLoopPromise<Void>?
  91. ) {
  92. fatalError("needs to be overridden")
  93. }
  94. /// Send a response to the client.
  95. ///
  96. /// - Parameters:
  97. /// - message: The message to send to the client.
  98. /// - compression: Whether compression should be used for this response. If compression
  99. /// is enabled in the call context, the value passed here takes precedence. Defaults to
  100. /// deferring to the value set on the call context.
  101. open func sendResponse(
  102. _ message: ResponsePayload,
  103. compression: Compression = .deferToCallDefault
  104. ) -> EventLoopFuture<Void> {
  105. let promise = self.eventLoop.makePromise(of: Void.self)
  106. self.sendResponse(message, compression: compression, promise: promise)
  107. return promise.futureResult
  108. }
  109. /// Sends a sequence of responses to the client.
  110. /// - Parameters:
  111. /// - messages: The messages to send to the client.
  112. /// - compression: Whether compression should be used for this response. If compression
  113. /// is enabled in the call context, the value passed here takes precedence. Defaults to
  114. /// deferring to the value set on the call context.
  115. /// - promise: A promise to complete once the messages have been sent.
  116. open func sendResponses<Messages: Sequence>(
  117. _ messages: Messages,
  118. compression: Compression = .deferToCallDefault,
  119. promise: EventLoopPromise<Void>?
  120. ) where Messages.Element == ResponsePayload {
  121. fatalError("needs to be overridden")
  122. }
  123. /// Sends a sequence of responses to the client.
  124. /// - Parameters:
  125. /// - messages: The messages to send to the client.
  126. /// - compression: Whether compression should be used for this response. If compression
  127. /// is enabled in the call context, the value passed here takes precedence. Defaults to
  128. /// deferring to the value set on the call context.
  129. open func sendResponses<Messages: Sequence>(
  130. _ messages: Messages,
  131. compression: Compression = .deferToCallDefault
  132. ) -> EventLoopFuture<Void> where Messages.Element == ResponsePayload {
  133. let promise = self.eventLoop.makePromise(of: Void.self)
  134. self.sendResponses(messages, compression: compression, promise: promise)
  135. return promise.futureResult
  136. }
  137. }
  138. /// A concrete implementation of `StreamingResponseCallContext` used internally.
  139. @usableFromInline
  140. internal final class _StreamingResponseCallContext<Request, Response>:
  141. StreamingResponseCallContext<Response>
  142. {
  143. @usableFromInline
  144. internal let _sendResponse: (Response, MessageMetadata, EventLoopPromise<Void>?) -> Void
  145. @usableFromInline
  146. internal let _compressionEnabledOnServer: Bool
  147. @inlinable
  148. internal init(
  149. eventLoop: EventLoop,
  150. headers: HPACKHeaders,
  151. logger: Logger,
  152. userInfoRef: Ref<UserInfo>,
  153. compressionIsEnabled: Bool,
  154. closeFuture: EventLoopFuture<Void>,
  155. sendResponse: @escaping (Response, MessageMetadata, EventLoopPromise<Void>?) -> Void
  156. ) {
  157. self._sendResponse = sendResponse
  158. self._compressionEnabledOnServer = compressionIsEnabled
  159. super.init(
  160. eventLoop: eventLoop,
  161. headers: headers,
  162. logger: logger,
  163. userInfoRef: userInfoRef,
  164. closeFuture: closeFuture
  165. )
  166. }
  167. @inlinable
  168. internal func shouldCompress(_ compression: Compression) -> Bool {
  169. guard self._compressionEnabledOnServer else {
  170. return false
  171. }
  172. return compression.isEnabled(callDefault: self.compressionEnabled)
  173. }
  174. @inlinable
  175. override func sendResponse(
  176. _ message: Response,
  177. compression: Compression = .deferToCallDefault,
  178. promise: EventLoopPromise<Void>?
  179. ) {
  180. if self.eventLoop.inEventLoop {
  181. let compress = self.shouldCompress(compression)
  182. self._sendResponse(message, .init(compress: compress, flush: true), promise)
  183. } else {
  184. self.eventLoop.execute {
  185. let compress = self.shouldCompress(compression)
  186. self._sendResponse(message, .init(compress: compress, flush: true), promise)
  187. }
  188. }
  189. }
  190. @inlinable
  191. override func sendResponses<Messages: Sequence>(
  192. _ messages: Messages,
  193. compression: Compression = .deferToCallDefault,
  194. promise: EventLoopPromise<Void>?
  195. ) where Response == Messages.Element {
  196. if self.eventLoop.inEventLoop {
  197. self._sendResponses(messages, compression: compression, promise: promise)
  198. } else {
  199. self.eventLoop.execute {
  200. self._sendResponses(messages, compression: compression, promise: promise)
  201. }
  202. }
  203. }
  204. @inlinable
  205. internal func _sendResponses<Messages: Sequence>(
  206. _ messages: Messages,
  207. compression: Compression,
  208. promise: EventLoopPromise<Void>?
  209. ) where Response == Messages.Element {
  210. let compress = self.shouldCompress(compression)
  211. var iterator = messages.makeIterator()
  212. var next = iterator.next()
  213. while let current = next {
  214. next = iterator.next()
  215. // Attach the promise, if present, to the last message.
  216. let isLast = next == nil
  217. self._sendResponse(current, .init(compress: compress, flush: isLast), isLast ? promise : nil)
  218. }
  219. }
  220. }
  221. /// Concrete implementation of `StreamingResponseCallContext` used for testing.
  222. ///
  223. /// Simply records all sent messages.
  224. open class StreamingResponseCallContextTestStub<ResponsePayload>: StreamingResponseCallContext<
  225. ResponsePayload
  226. >
  227. {
  228. open var recordedResponses: [ResponsePayload] = []
  229. override open func sendResponse(
  230. _ message: ResponsePayload,
  231. compression: Compression = .deferToCallDefault,
  232. promise: EventLoopPromise<Void>?
  233. ) {
  234. self.recordedResponses.append(message)
  235. promise?.succeed(())
  236. }
  237. override open func sendResponses<Messages: Sequence>(
  238. _ messages: Messages,
  239. compression: Compression = .deferToCallDefault,
  240. promise: EventLoopPromise<Void>?
  241. ) where ResponsePayload == Messages.Element {
  242. self.recordedResponses.append(contentsOf: messages)
  243. promise?.succeed(())
  244. }
  245. }