BaseClientCall.swift 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP2
  20. import SwiftProtobuf
  21. import Logging
  22. /// This class provides much of the boilerplate for the four types of gRPC call objects returned to
  23. /// framework users.
  24. ///
  25. /// Each call will be configured on a multiplexed channel on the given connection. The multiplexed
  26. /// channel will be configured as such:
  27. ///
  28. /// ┌──────────────────────────────┐
  29. /// │ ClientResponseChannelHandler │
  30. /// └────────────▲─────────────────┘
  31. /// │ ┌─────────────────────────────┐
  32. /// │ │ ClientRequestChannelHandler │
  33. /// │ └────────────────┬────────────┘
  34. /// GRPCClientResponsePart<T1>│ │GRPCClientRequestPart<T2>
  35. /// ┌─┴───────────────────────▼─┐
  36. /// │ GRPCClientChannelHandler │
  37. /// └─▲───────────────────────┬─┘
  38. /// HTTP2Frame│ │HTTP2Frame
  39. /// | |
  40. ///
  41. /// Note: the "main" pipeline provided by the channel in `ClientConnection`.
  42. ///
  43. /// Setup includes:
  44. /// - creation of an HTTP/2 stream for the call to execute on,
  45. /// - configuration of the NIO channel handlers for the stream, and
  46. /// - setting a call timeout, if one is provided.
  47. ///
  48. /// This class also provides much of the framework user facing functionality via conformance to
  49. /// `ClientCall`.
  50. public class BaseClientCall<Request: Message, Response: Message>: ClientCall {
  51. public typealias RequestMessage = Request
  52. public typealias ResponseMessage = Response
  53. internal let logger: Logger
  54. /// HTTP/2 multiplexer providing the stream.
  55. internal let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
  56. // Note: documentation is inherited from the `ClientCall` protocol.
  57. public let subchannel: EventLoopFuture<Channel>
  58. public let initialMetadata: EventLoopFuture<HPACKHeaders>
  59. public let trailingMetadata: EventLoopFuture<HPACKHeaders>
  60. public let status: EventLoopFuture<GRPCStatus>
  61. /// Sets up a new RPC call.
  62. ///
  63. /// - Parameter eventLoop: The event loop the connection is running on.
  64. /// - Parameter multiplexer: The multiplexer future to use to provide a stream channel.
  65. /// - Parameter callType: The type of RPC call, e.g. unary, server-streaming.
  66. /// - Parameter responseHandler: Channel handler for reading responses.
  67. /// - Parameter requestHandler: Channel handler for writing requests..
  68. /// - Parameter logger: Logger.
  69. init(
  70. eventLoop: EventLoop,
  71. multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
  72. callType: GRPCCallType,
  73. responseHandler: GRPCClientResponseChannelHandler<Response>,
  74. requestHandler: _ClientRequestChannelHandler<Request>,
  75. logger: Logger
  76. ) {
  77. self.logger = logger
  78. self.multiplexer = multiplexer
  79. let streamPromise = eventLoop.makePromise(of: Channel.self)
  80. // Take the futures we need from the response handler.
  81. self.subchannel = streamPromise.futureResult
  82. self.initialMetadata = responseHandler.initialMetadataPromise.futureResult
  83. self.trailingMetadata = responseHandler.trailingMetadataPromise.futureResult
  84. self.status = responseHandler.statusPromise.futureResult
  85. // If the stream (or multiplexer) fail we need to fail any responses.
  86. self.multiplexer.cascadeFailure(to: streamPromise)
  87. streamPromise.futureResult.whenFailure(responseHandler.onError)
  88. // Create an HTTP/2 stream and configure it with the gRPC handler.
  89. self.multiplexer.whenSuccess { multiplexer in
  90. multiplexer.createStreamChannel(promise: streamPromise) { (stream, streamID) -> EventLoopFuture<Void> in
  91. stream.pipeline.addHandlers([
  92. _GRPCClientChannelHandler<Request, Response>(streamID: streamID, callType: callType, logger: logger),
  93. responseHandler,
  94. requestHandler
  95. ])
  96. }
  97. }
  98. // Schedule the timeout.
  99. responseHandler.scheduleTimeout(eventLoop: eventLoop)
  100. }
  101. public func cancel(promise: EventLoopPromise<Void>?) {
  102. self.subchannel.whenComplete {
  103. switch $0 {
  104. case .success(let channel):
  105. self.logger.trace("firing .cancelled event")
  106. channel.pipeline.triggerUserOutboundEvent(GRPCClientUserEvent.cancelled, promise: promise)
  107. case .failure(let error):
  108. promise?.fail(error)
  109. }
  110. }
  111. }
  112. public func cancel() -> EventLoopFuture<Void> {
  113. return self.subchannel.flatMap { channel in
  114. self.logger.trace("firing .cancelled event")
  115. return channel.pipeline.triggerUserOutboundEvent(GRPCClientUserEvent.cancelled)
  116. }
  117. }
  118. }
  119. extension _GRPCRequestHead {
  120. init(
  121. scheme: String,
  122. path: String,
  123. host: String,
  124. requestID: String,
  125. options: CallOptions
  126. ) {
  127. var customMetadata = options.customMetadata
  128. if let requestIDHeader = options.requestIDHeader {
  129. customMetadata.add(name: requestIDHeader, value: requestID)
  130. }
  131. self = _GRPCRequestHead(
  132. method: options.cacheable ? "GET" : "POST",
  133. scheme: scheme,
  134. path: path,
  135. host: host,
  136. timeout: options.timeout,
  137. customMetadata: customMetadata
  138. )
  139. }
  140. }