StreamingResponseCallContext.swift 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOHPACK
  20. import NIOHTTP1
  21. import SwiftProtobuf
  22. /// An abstract base class for a context provided to handlers for RPCs which may return multiple
  23. /// responses, i.e. server streaming and bidirectional streaming RPCs.
  24. open class StreamingResponseCallContext<ResponsePayload>: ServerCallContextBase {
  25. /// A promise for the `GRPCStatus`, the end of the response stream. This must be completed by
  26. /// bidirectional streaming RPC handlers to end the RPC.
  27. ///
  28. /// Note that while this is also present for server streaming RPCs, it is not necessary to
  29. /// complete this promise: instead, an `EventLoopFuture<GRPCStatus>` must be returned from the
  30. /// handler.
  31. public let statusPromise: EventLoopPromise<GRPCStatus>
  32. public convenience init(
  33. eventLoop: EventLoop,
  34. headers: HPACKHeaders,
  35. logger: Logger,
  36. userInfo: UserInfo = UserInfo()
  37. ) {
  38. self.init(eventLoop: eventLoop, headers: headers, logger: logger, userInfoRef: .init(userInfo))
  39. }
  40. @inlinable
  41. override internal init(
  42. eventLoop: EventLoop,
  43. headers: HPACKHeaders,
  44. logger: Logger,
  45. userInfoRef: Ref<UserInfo>
  46. ) {
  47. self.statusPromise = eventLoop.makePromise()
  48. super.init(eventLoop: eventLoop, headers: headers, logger: logger, userInfoRef: userInfoRef)
  49. }
  50. /// Send a response to the client.
  51. ///
  52. /// - Parameters:
  53. /// - message: The message to send to the client.
  54. /// - compression: Whether compression should be used for this response. If compression
  55. /// is enabled in the call context, the value passed here takes precedence. Defaults to
  56. /// deferring to the value set on the call context.
  57. /// - promise: A promise to complete once the message has been sent.
  58. open func sendResponse(
  59. _ message: ResponsePayload,
  60. compression: Compression = .deferToCallDefault,
  61. promise: EventLoopPromise<Void>?
  62. ) {
  63. fatalError("needs to be overridden")
  64. }
  65. /// Send a response to the client.
  66. ///
  67. /// - Parameters:
  68. /// - message: The message to send to the client.
  69. /// - compression: Whether compression should be used for this response. If compression
  70. /// is enabled in the call context, the value passed here takes precedence. Defaults to
  71. /// deferring to the value set on the call context.
  72. open func sendResponse(
  73. _ message: ResponsePayload,
  74. compression: Compression = .deferToCallDefault
  75. ) -> EventLoopFuture<Void> {
  76. let promise = self.eventLoop.makePromise(of: Void.self)
  77. self.sendResponse(message, compression: compression, promise: promise)
  78. return promise.futureResult
  79. }
  80. /// Sends a sequence of responses to the client.
  81. /// - Parameters:
  82. /// - messages: The messages to send to the client.
  83. /// - compression: Whether compression should be used for this response. If compression
  84. /// is enabled in the call context, the value passed here takes precedence. Defaults to
  85. /// deferring to the value set on the call context.
  86. /// - promise: A promise to complete once the messages have been sent.
  87. open func sendResponses<Messages: Sequence>(
  88. _ messages: Messages,
  89. compression: Compression = .deferToCallDefault,
  90. promise: EventLoopPromise<Void>?
  91. ) where Messages.Element == ResponsePayload {
  92. fatalError("needs to be overridden")
  93. }
  94. /// Sends a sequence of responses to the client.
  95. /// - Parameters:
  96. /// - messages: The messages to send to the client.
  97. /// - compression: Whether compression should be used for this response. If compression
  98. /// is enabled in the call context, the value passed here takes precedence. Defaults to
  99. /// deferring to the value set on the call context.
  100. open func sendResponses<Messages: Sequence>(
  101. _ messages: Messages,
  102. compression: Compression = .deferToCallDefault
  103. ) -> EventLoopFuture<Void> where Messages.Element == ResponsePayload {
  104. let promise = self.eventLoop.makePromise(of: Void.self)
  105. self.sendResponses(messages, compression: compression, promise: promise)
  106. return promise.futureResult
  107. }
  108. }
  109. /// A concrete implementation of `StreamingResponseCallContext` used internally.
  110. @usableFromInline
  111. internal final class _StreamingResponseCallContext<Request, Response>:
  112. StreamingResponseCallContext<Response> {
  113. @usableFromInline
  114. internal let _sendResponse: (Response, MessageMetadata, EventLoopPromise<Void>?) -> Void
  115. @usableFromInline
  116. internal let _compressionEnabledOnServer: Bool
  117. @inlinable
  118. internal init(
  119. eventLoop: EventLoop,
  120. headers: HPACKHeaders,
  121. logger: Logger,
  122. userInfoRef: Ref<UserInfo>,
  123. compressionIsEnabled: Bool,
  124. sendResponse: @escaping (Response, MessageMetadata, EventLoopPromise<Void>?) -> Void
  125. ) {
  126. self._sendResponse = sendResponse
  127. self._compressionEnabledOnServer = compressionIsEnabled
  128. super.init(eventLoop: eventLoop, headers: headers, logger: logger, userInfoRef: userInfoRef)
  129. }
  130. @inlinable
  131. internal func shouldCompress(_ compression: Compression) -> Bool {
  132. guard self._compressionEnabledOnServer else {
  133. return false
  134. }
  135. return compression.isEnabled(callDefault: self.compressionEnabled)
  136. }
  137. @inlinable
  138. override func sendResponse(
  139. _ message: Response,
  140. compression: Compression = .deferToCallDefault,
  141. promise: EventLoopPromise<Void>?
  142. ) {
  143. if self.eventLoop.inEventLoop {
  144. let compress = self.shouldCompress(compression)
  145. self._sendResponse(message, .init(compress: compress, flush: true), promise)
  146. } else {
  147. self.eventLoop.execute {
  148. let compress = self.shouldCompress(compression)
  149. self._sendResponse(message, .init(compress: compress, flush: true), promise)
  150. }
  151. }
  152. }
  153. @inlinable
  154. override func sendResponses<Messages: Sequence>(
  155. _ messages: Messages,
  156. compression: Compression = .deferToCallDefault,
  157. promise: EventLoopPromise<Void>?
  158. ) where Response == Messages.Element {
  159. if self.eventLoop.inEventLoop {
  160. self._sendResponses(messages, compression: compression, promise: promise)
  161. } else {
  162. self.eventLoop.execute {
  163. self._sendResponses(messages, compression: compression, promise: promise)
  164. }
  165. }
  166. }
  167. @inlinable
  168. internal func _sendResponses<Messages: Sequence>(
  169. _ messages: Messages,
  170. compression: Compression,
  171. promise: EventLoopPromise<Void>?
  172. ) where Response == Messages.Element {
  173. let compress = self.shouldCompress(compression)
  174. var iterator = messages.makeIterator()
  175. var next = iterator.next()
  176. while let current = next {
  177. next = iterator.next()
  178. // Attach the promise, if present, to the last message.
  179. let isLast = next == nil
  180. self._sendResponse(current, .init(compress: compress, flush: isLast), isLast ? promise : nil)
  181. }
  182. }
  183. }
  184. /// Concrete implementation of `StreamingResponseCallContext` used for testing.
  185. ///
  186. /// Simply records all sent messages.
  187. open class StreamingResponseCallContextTestStub<ResponsePayload>: StreamingResponseCallContext<ResponsePayload> {
  188. open var recordedResponses: [ResponsePayload] = []
  189. override open func sendResponse(
  190. _ message: ResponsePayload,
  191. compression: Compression = .deferToCallDefault,
  192. promise: EventLoopPromise<Void>?
  193. ) {
  194. self.recordedResponses.append(message)
  195. promise?.succeed(())
  196. }
  197. override open func sendResponses<Messages: Sequence>(
  198. _ messages: Messages,
  199. compression: Compression = .deferToCallDefault,
  200. promise: EventLoopPromise<Void>?
  201. ) where ResponsePayload == Messages.Element {
  202. self.recordedResponses.append(contentsOf: messages)
  203. promise?.succeed(())
  204. }
  205. }