ClientTransportFactory.swift 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIO
  17. import NIOHTTP2
  18. import protocol SwiftProtobuf.Message
  19. /// A `ClientTransport` factory for an RPC.
  20. @usableFromInline
  21. internal struct ClientTransportFactory<Request, Response> {
  22. /// The underlying transport factory.
  23. private var factory: Factory<Request, Response>
  24. private enum Factory<Request, Response> {
  25. case http2(HTTP2ClientTransportFactory<Request, Response>)
  26. case fake(FakeClientTransportFactory<Request, Response>)
  27. }
  28. private init(_ http2: HTTP2ClientTransportFactory<Request, Response>) {
  29. self.factory = .http2(http2)
  30. }
  31. private init(_ fake: FakeClientTransportFactory<Request, Response>) {
  32. self.factory = .fake(fake)
  33. }
  34. /// Create a transport factory for HTTP/2 based transport with `SwiftProtobuf.Message` messages.
  35. /// - Parameters:
  36. /// - multiplexer: The multiplexer used to create an HTTP/2 stream for the RPC.
  37. /// - host: The value of the ":authority" pseudo header.
  38. /// - scheme: The value of the ":scheme" pseudo header.
  39. /// - errorDelegate: A client error delegate.
  40. /// - Returns: A factory for making and configuring HTTP/2 based transport.
  41. internal static func http2<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  42. multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
  43. authority: String,
  44. scheme: String,
  45. errorDelegate: ClientErrorDelegate?
  46. ) -> ClientTransportFactory<Request, Response> {
  47. let http2 = HTTP2ClientTransportFactory<Request, Response>(
  48. multiplexer: multiplexer,
  49. scheme: scheme,
  50. authority: authority,
  51. serializer: ProtobufSerializer(),
  52. deserializer: ProtobufDeserializer(),
  53. errorDelegate: errorDelegate
  54. )
  55. return .init(http2)
  56. }
  57. /// Create a transport factory for HTTP/2 based transport with `GRPCPayload` messages.
  58. /// - Parameters:
  59. /// - multiplexer: The multiplexer used to create an HTTP/2 stream for the RPC.
  60. /// - host: The value of the ":authority" pseudo header.
  61. /// - scheme: The value of the ":scheme" pseudo header.
  62. /// - errorDelegate: A client error delegate.
  63. /// - Returns: A factory for making and configuring HTTP/2 based transport.
  64. internal static func http2<Request: GRPCPayload, Response: GRPCPayload>(
  65. multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
  66. authority: String,
  67. scheme: String,
  68. errorDelegate: ClientErrorDelegate?
  69. ) -> ClientTransportFactory<Request, Response> {
  70. let http2 = HTTP2ClientTransportFactory<Request, Response>(
  71. multiplexer: multiplexer,
  72. scheme: scheme,
  73. authority: authority,
  74. serializer: GRPCPayloadSerializer(),
  75. deserializer: GRPCPayloadDeserializer(),
  76. errorDelegate: errorDelegate
  77. )
  78. return .init(http2)
  79. }
  80. /// Make a factory for 'fake' transport.
  81. /// - Parameter fakeResponse: The fake response stream.
  82. /// - Returns: A factory for making and configuring fake transport.
  83. internal static func fake(
  84. _ fakeResponse: _FakeResponseStream<Request, Response>?,
  85. on eventLoop: EventLoop
  86. ) -> ClientTransportFactory<Request, Response> {
  87. return .init(FakeClientTransportFactory(fakeResponse, on: eventLoop))
  88. }
  89. /// Makes a configured `ClientTransport`.
  90. /// - Parameters:
  91. /// - path: The path of the RPC, e.g. "/echo.Echo/Get".
  92. /// - type: The type of RPC, e.g. `.unary`.
  93. /// - options: Options for the RPC.
  94. /// - interceptors: Interceptors to use for the RPC.
  95. /// - onResponsePart: A closure called for each response part received.
  96. /// - Returns: A configured transport.
  97. internal func makeConfiguredTransport<Request, Response>(
  98. to path: String,
  99. for type: GRPCCallType,
  100. withOptions options: CallOptions,
  101. interceptedBy interceptors: [ClientInterceptor<Request, Response>],
  102. _ onResponsePart: @escaping (ClientResponsePart<Response>) -> Void
  103. ) -> ClientTransport<Request, Response> {
  104. switch self.factory {
  105. case let .http2(factory):
  106. let transport = factory.makeTransport(
  107. to: path,
  108. for: type,
  109. withOptions: options,
  110. interceptedBy: interceptors,
  111. onResponsePart
  112. )
  113. factory.configure(transport)
  114. return transport
  115. case let .fake(factory):
  116. let transport = factory.makeTransport(
  117. to: path,
  118. for: type,
  119. withOptions: options,
  120. interceptedBy: interceptors,
  121. onResponsePart
  122. )
  123. factory.configure(transport)
  124. return transport
  125. }
  126. }
  127. }
  128. private struct HTTP2ClientTransportFactory<Request, Response> {
  129. /// The multiplexer providing an HTTP/2 stream for the call.
  130. private var multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
  131. /// The ":authority" pseudo-header.
  132. private var authority: String
  133. /// The ":scheme" pseudo-header.
  134. private var scheme: String
  135. /// An error delegate.
  136. private var errorDelegate: ClientErrorDelegate?
  137. /// A codec for serializing request messages and deserializing response parts.
  138. private var codec: ChannelHandler
  139. fileprivate init<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  140. multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
  141. scheme: String,
  142. authority: String,
  143. serializer: Serializer,
  144. deserializer: Deserializer,
  145. errorDelegate: ClientErrorDelegate?
  146. ) where Serializer.Input == Request, Deserializer.Output == Response {
  147. self.multiplexer = multiplexer
  148. self.scheme = scheme
  149. self.authority = authority
  150. self.codec = GRPCClientCodecHandler(serializer: serializer, deserializer: deserializer)
  151. self.errorDelegate = errorDelegate
  152. }
  153. fileprivate func makeTransport<Request, Response>(
  154. to path: String,
  155. for type: GRPCCallType,
  156. withOptions options: CallOptions,
  157. interceptedBy interceptors: [ClientInterceptor<Request, Response>],
  158. _ onResponsePart: @escaping (ClientResponsePart<Response>) -> Void
  159. ) -> ClientTransport<Request, Response> {
  160. return ClientTransport(
  161. details: self.makeCallDetails(type: type, path: path, options: options),
  162. eventLoop: self.multiplexer.eventLoop,
  163. interceptors: interceptors,
  164. errorDelegate: self.errorDelegate,
  165. onResponsePart
  166. )
  167. }
  168. fileprivate func configure<Request, Response>(_ transport: ClientTransport<Request, Response>) {
  169. transport.configure { _ in
  170. self.multiplexer.flatMap { multiplexer in
  171. let streamPromise = self.multiplexer.eventLoop.makePromise(of: Channel.self)
  172. multiplexer.createStreamChannel(promise: streamPromise) { streamChannel in
  173. streamChannel.pipeline.addHandlers([
  174. _GRPCClientChannelHandler(
  175. callType: transport.callDetails.type,
  176. logger: transport.logger
  177. ),
  178. self.codec,
  179. transport,
  180. ])
  181. }
  182. // We don't need the stream, but we do need to know it was correctly configured.
  183. return streamPromise.futureResult.map { _ in }
  184. }
  185. }
  186. }
  187. private func makeCallDetails(
  188. type: GRPCCallType,
  189. path: String,
  190. options: CallOptions
  191. ) -> CallDetails {
  192. return .init(
  193. type: type,
  194. path: path,
  195. authority: self.authority,
  196. scheme: self.scheme,
  197. options: options
  198. )
  199. }
  200. }
  201. private struct FakeClientTransportFactory<Request, Response> {
  202. /// The fake response stream for the call. This can be `nil` if the user did not correctly
  203. /// configure their client. The result will be a transport which immediately fails.
  204. private var fakeResponseStream: _FakeResponseStream<Request, Response>?
  205. /// The `EventLoop` from the response stream, or an `EmbeddedEventLoop` should the response
  206. /// stream be `nil`.
  207. private var eventLoop: EventLoop
  208. fileprivate init(
  209. _ fakeResponseStream: _FakeResponseStream<Request, Response>?,
  210. on eventLoop: EventLoop
  211. ) {
  212. self.fakeResponseStream = fakeResponseStream
  213. self.eventLoop = eventLoop
  214. }
  215. fileprivate func makeTransport<Request, Response>(
  216. to path: String,
  217. for type: GRPCCallType,
  218. withOptions options: CallOptions,
  219. interceptedBy interceptors: [ClientInterceptor<Request, Response>],
  220. _ onResponsePart: @escaping (ClientResponsePart<Response>) -> Void
  221. ) -> ClientTransport<Request, Response> {
  222. return ClientTransport(
  223. details: CallDetails(
  224. type: type,
  225. path: path,
  226. authority: "localhost",
  227. scheme: "http",
  228. options: options
  229. ),
  230. eventLoop: self.eventLoop,
  231. interceptors: interceptors,
  232. errorDelegate: nil,
  233. onResponsePart
  234. )
  235. }
  236. fileprivate func configure<Request, Response>(_ transport: ClientTransport<Request, Response>) {
  237. transport.configure { handler in
  238. if let fakeResponse = self.fakeResponseStream {
  239. return fakeResponse.channel.pipeline.addHandler(handler).always { result in
  240. switch result {
  241. case .success:
  242. fakeResponse.activate()
  243. case .failure:
  244. ()
  245. }
  246. }
  247. } else {
  248. return self.eventLoop.makeFailedFuture(GRPCStatus(code: .unavailable, message: nil))
  249. }
  250. }
  251. }
  252. }