ServerStreamingCall.swift 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. /// A server-streaming gRPC call. The request is sent on initialization, each response is passed to
  21. /// the provided observer block.
  22. public final class ServerStreamingCall<RequestPayload, ResponsePayload>: ClientCall {
  23. private let transport: ChannelTransport<RequestPayload, ResponsePayload>
  24. /// The options used to make the RPC.
  25. public let options: CallOptions
  26. /// The `Channel` used to transport messages for this RPC.
  27. public var subchannel: EventLoopFuture<Channel> {
  28. return self.transport.streamChannel()
  29. }
  30. /// The `EventLoop` this call is running on.
  31. public var eventLoop: EventLoop {
  32. return self.transport.eventLoop
  33. }
  34. /// Cancel this RPC if it hasn't already completed.
  35. public func cancel(promise: EventLoopPromise<Void>?) {
  36. self.transport.cancel(promise: promise)
  37. }
  38. // MARK: - Response Parts
  39. /// The initial metadata returned from the server.
  40. public var initialMetadata: EventLoopFuture<HPACKHeaders> {
  41. if self.eventLoop.inEventLoop {
  42. return self.transport.responseContainer.lazyInitialMetadataPromise.getFutureResult()
  43. } else {
  44. return self.eventLoop.flatSubmit {
  45. return self.transport.responseContainer.lazyInitialMetadataPromise.getFutureResult()
  46. }
  47. }
  48. }
  49. /// The trailing metadata returned from the server.
  50. public var trailingMetadata: EventLoopFuture<HPACKHeaders> {
  51. if self.eventLoop.inEventLoop {
  52. return self.transport.responseContainer.lazyTrailingMetadataPromise.getFutureResult()
  53. } else {
  54. return self.eventLoop.flatSubmit {
  55. return self.transport.responseContainer.lazyTrailingMetadataPromise.getFutureResult()
  56. }
  57. }
  58. }
  59. /// The final status of the the RPC.
  60. public var status: EventLoopFuture<GRPCStatus> {
  61. if self.eventLoop.inEventLoop {
  62. return self.transport.responseContainer.lazyStatusPromise.getFutureResult()
  63. } else {
  64. return self.eventLoop.flatSubmit {
  65. return self.transport.responseContainer.lazyStatusPromise.getFutureResult()
  66. }
  67. }
  68. }
  69. internal init(
  70. transport: ChannelTransport<RequestPayload, ResponsePayload>,
  71. options: CallOptions
  72. ) {
  73. self.transport = transport
  74. self.options = options
  75. }
  76. internal func send(_ head: _GRPCRequestHead, request: RequestPayload) {
  77. self.transport.sendUnary(
  78. head,
  79. request: request,
  80. compressed: self.options.messageEncoding.enabledForRequests
  81. )
  82. }
  83. }
  84. extension ServerStreamingCall {
  85. internal static func makeOnHTTP2Stream<
  86. Serializer: MessageSerializer,
  87. Deserializer: MessageDeserializer
  88. >(
  89. multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
  90. serializer: Serializer,
  91. deserializer: Deserializer,
  92. callOptions: CallOptions,
  93. errorDelegate: ClientErrorDelegate?,
  94. logger: Logger,
  95. responseHandler: @escaping (ResponsePayload) -> Void
  96. ) -> ServerStreamingCall<RequestPayload, ResponsePayload>
  97. where Serializer.Input == RequestPayload,
  98. Deserializer.Output == ResponsePayload {
  99. let eventLoop = multiplexer.eventLoop
  100. let transport = ChannelTransport<RequestPayload, ResponsePayload>(
  101. multiplexer: multiplexer,
  102. serializer: serializer,
  103. deserializer: deserializer,
  104. responseContainer: .init(eventLoop: eventLoop, streamingResponseHandler: responseHandler),
  105. callType: .serverStreaming,
  106. timeLimit: callOptions.timeLimit,
  107. errorDelegate: errorDelegate,
  108. logger: logger
  109. )
  110. return ServerStreamingCall(transport: transport, options: callOptions)
  111. }
  112. internal static func make<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  113. serializer: Serializer,
  114. deserializer: Deserializer,
  115. fakeResponse: FakeStreamingResponse<RequestPayload, ResponsePayload>?,
  116. callOptions: CallOptions,
  117. logger: Logger,
  118. responseHandler: @escaping (ResponsePayload) -> Void
  119. ) -> ServerStreamingCall<RequestPayload, ResponsePayload>
  120. where Serializer.Input == RequestPayload,
  121. Deserializer.Output == ResponsePayload {
  122. let eventLoop = fakeResponse?.channel.eventLoop ?? EmbeddedEventLoop()
  123. let responseContainer = ResponsePartContainer(
  124. eventLoop: eventLoop,
  125. streamingResponseHandler: responseHandler
  126. )
  127. let transport: ChannelTransport<RequestPayload, ResponsePayload>
  128. if let callProxy = fakeResponse {
  129. transport = .init(
  130. fakeResponse: callProxy,
  131. responseContainer: responseContainer,
  132. timeLimit: callOptions.timeLimit,
  133. logger: logger
  134. )
  135. callProxy.activate()
  136. } else {
  137. transport = .makeTransportForMissingFakeResponse(
  138. eventLoop: eventLoop,
  139. responseContainer: responseContainer,
  140. logger: logger
  141. )
  142. }
  143. return ServerStreamingCall(transport: transport, options: callOptions)
  144. }
  145. }