2
0

BaseClientCall.swift 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. import SwiftProtobuf
  21. import Logging
  22. /// This class provides much of the boilerplate for the four types of gRPC call objects returned to
  23. /// framework users.
  24. ///
  25. /// Each call will be configured on a multiplexed channel on the given connection. The multiplexed
  26. /// channel will be configured as such:
  27. ///
  28. /// ┌──────────────────────────────┐
  29. /// │ ClientResponseChannelHandler │
  30. /// └────────────▲─────────────────┘
  31. /// │ ┌─────────────────────────────┐
  32. /// │ │ ClientRequestChannelHandler │
  33. /// │ └────────────────┬────────────┘
  34. /// GRPCClientResponsePart<T1>│ │GRPCClientRequestPart<T2>
  35. /// ┌─┴───────────────────────▼─┐
  36. /// │ GRPCClientChannelHandler │
  37. /// └─▲───────────────────────┬─┘
  38. /// HTTP2Frame│ │HTTP2Frame
  39. /// | |
  40. ///
  41. /// Note: the "main" pipeline provided by the channel in `ClientConnection`.
  42. ///
  43. /// Setup includes:
  44. /// - creation of an HTTP/2 stream for the call to execute on,
  45. /// - configuration of the NIO channel handlers for the stream, and
  46. /// - setting a call timeout, if one is provided.
  47. ///
  48. /// This class also provides much of the framework user facing functionality via conformance to
  49. /// `ClientCall`.
  50. public class BaseClientCall<Request: GRPCPayload, Response: GRPCPayload>: ClientCall {
  51. public typealias RequestPayload = Request
  52. public typealias ResponsePayload = Response
  53. internal let logger: Logger
  54. /// HTTP/2 multiplexer providing the stream.
  55. internal let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
  56. // Note: documentation is inherited from the `ClientCall` protocol.
  57. public let options: CallOptions
  58. public let subchannel: EventLoopFuture<Channel>
  59. public let initialMetadata: EventLoopFuture<HPACKHeaders>
  60. public let trailingMetadata: EventLoopFuture<HPACKHeaders>
  61. public let status: EventLoopFuture<GRPCStatus>
  62. /// Sets up a new RPC call.
  63. ///
  64. /// - Parameter eventLoop: The event loop the connection is running on.
  65. /// - Parameter multiplexer: The multiplexer future to use to provide a stream channel.
  66. /// - Parameter callType: The type of RPC call, e.g. unary, server-streaming.
  67. /// - Parameter responseHandler: Channel handler for reading responses.
  68. /// - Parameter requestHandler: Channel handler for writing requests..
  69. /// - Parameter logger: Logger.
  70. init(
  71. eventLoop: EventLoop,
  72. multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
  73. callType: GRPCCallType,
  74. callOptions: CallOptions,
  75. responseHandler: GRPCClientResponseChannelHandler<Response>,
  76. requestHandler: _ClientRequestChannelHandler<Request>,
  77. logger: Logger
  78. ) {
  79. self.logger = logger
  80. self.multiplexer = multiplexer
  81. self.options = callOptions
  82. let streamPromise = eventLoop.makePromise(of: Channel.self)
  83. // Take the futures we need from the response handler.
  84. self.subchannel = streamPromise.futureResult
  85. self.initialMetadata = responseHandler.initialMetadataPromise.futureResult
  86. self.trailingMetadata = responseHandler.trailingMetadataPromise.futureResult
  87. self.status = responseHandler.statusPromise.futureResult
  88. // If the stream (or multiplexer) fail we need to fail any responses.
  89. self.multiplexer.cascadeFailure(to: streamPromise)
  90. streamPromise.futureResult.whenFailure(responseHandler.onError)
  91. // Create an HTTP/2 stream and configure it with the gRPC handler.
  92. self.multiplexer.whenSuccess { multiplexer in
  93. multiplexer.createStreamChannel(promise: streamPromise) { (stream, streamID) -> EventLoopFuture<Void> in
  94. stream.pipeline.addHandlers([
  95. _GRPCClientChannelHandler<Request, Response>(streamID: streamID, callType: callType, logger: logger),
  96. responseHandler,
  97. requestHandler
  98. ])
  99. }
  100. }
  101. // Schedule the timeout.
  102. responseHandler.scheduleTimeout(eventLoop: eventLoop)
  103. }
  104. public func cancel(promise: EventLoopPromise<Void>?) {
  105. self.subchannel.whenComplete {
  106. switch $0 {
  107. case .success(let channel):
  108. self.logger.trace("firing .cancelled event")
  109. channel.pipeline.triggerUserOutboundEvent(GRPCClientUserEvent.cancelled, promise: promise)
  110. case .failure(let error):
  111. promise?.fail(error)
  112. }
  113. }
  114. }
  115. public func cancel() -> EventLoopFuture<Void> {
  116. return self.subchannel.flatMap { channel in
  117. self.logger.trace("firing .cancelled event")
  118. return channel.pipeline.triggerUserOutboundEvent(GRPCClientUserEvent.cancelled)
  119. }
  120. }
  121. }
  122. extension _GRPCRequestHead {
  123. init(
  124. scheme: String,
  125. path: String,
  126. host: String,
  127. requestID: String,
  128. options: CallOptions
  129. ) {
  130. var customMetadata = options.customMetadata
  131. if let requestIDHeader = options.requestIDHeader {
  132. customMetadata.add(name: requestIDHeader, value: requestID)
  133. }
  134. self = _GRPCRequestHead(
  135. method: options.cacheable ? "GET" : "POST",
  136. scheme: scheme,
  137. path: path,
  138. host: host,
  139. timeout: options.timeout,
  140. customMetadata: customMetadata,
  141. encoding: options.messageEncoding
  142. )
  143. }
  144. }