ClientTransportFactory.swift 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIO
  17. import NIOHTTP2
  18. import protocol SwiftProtobuf.Message
  19. /// A `ClientTransport` factory for an RPC.
  20. @usableFromInline
  21. internal struct ClientTransportFactory<Request, Response> {
  22. /// The underlying transport factory.
  23. private var factory: Factory<Request, Response>
  24. private enum Factory<Request, Response> {
  25. case http2(HTTP2ClientTransportFactory<Request, Response>)
  26. case fake(FakeClientTransportFactory<Request, Response>)
  27. }
  28. private init(_ http2: HTTP2ClientTransportFactory<Request, Response>) {
  29. self.factory = .http2(http2)
  30. }
  31. private init(_ fake: FakeClientTransportFactory<Request, Response>) {
  32. self.factory = .fake(fake)
  33. }
  34. /// Create a transport factory for HTTP/2 based transport with `SwiftProtobuf.Message` messages.
  35. /// - Parameters:
  36. /// - multiplexer: The multiplexer used to create an HTTP/2 stream for the RPC.
  37. /// - host: The value of the ":authority" pseudo header.
  38. /// - scheme: The value of the ":scheme" pseudo header.
  39. /// - errorDelegate: A client error delegate.
  40. /// - Returns: A factory for making and configuring HTTP/2 based transport.
  41. internal static func http2<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  42. multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
  43. authority: String,
  44. scheme: String,
  45. errorDelegate: ClientErrorDelegate?
  46. ) -> ClientTransportFactory<Request, Response> {
  47. let http2 = HTTP2ClientTransportFactory<Request, Response>(
  48. multiplexer: multiplexer,
  49. scheme: scheme,
  50. authority: authority,
  51. serializer: ProtobufSerializer(),
  52. deserializer: ProtobufDeserializer(),
  53. errorDelegate: errorDelegate
  54. )
  55. return .init(http2)
  56. }
  57. /// Create a transport factory for HTTP/2 based transport with `GRPCPayload` messages.
  58. /// - Parameters:
  59. /// - multiplexer: The multiplexer used to create an HTTP/2 stream for the RPC.
  60. /// - host: The value of the ":authority" pseudo header.
  61. /// - scheme: The value of the ":scheme" pseudo header.
  62. /// - errorDelegate: A client error delegate.
  63. /// - Returns: A factory for making and configuring HTTP/2 based transport.
  64. internal static func http2<Request: GRPCPayload, Response: GRPCPayload>(
  65. multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
  66. authority: String,
  67. scheme: String,
  68. errorDelegate: ClientErrorDelegate?
  69. ) -> ClientTransportFactory<Request, Response> {
  70. let http2 = HTTP2ClientTransportFactory<Request, Response>(
  71. multiplexer: multiplexer,
  72. scheme: scheme,
  73. authority: authority,
  74. serializer: AnySerializer(wrapping: GRPCPayloadSerializer()),
  75. deserializer: AnyDeserializer(wrapping: GRPCPayloadDeserializer()),
  76. errorDelegate: errorDelegate
  77. )
  78. return .init(http2)
  79. }
  80. /// Make a factory for 'fake' transport.
  81. /// - Parameter fakeResponse: The fake response stream.
  82. /// - Returns: A factory for making and configuring fake transport.
  83. internal static func fake<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  84. _ fakeResponse: _FakeResponseStream<Request, Response>?
  85. ) -> ClientTransportFactory<Request, Response> {
  86. let factory = FakeClientTransportFactory(
  87. fakeResponse,
  88. requestSerializer: ProtobufSerializer(),
  89. requestDeserializer: ProtobufDeserializer(),
  90. responseSerializer: ProtobufSerializer(),
  91. responseDeserializer: ProtobufDeserializer()
  92. )
  93. return .init(factory)
  94. }
  95. /// Make a factory for 'fake' transport.
  96. /// - Parameter fakeResponse: The fake response stream.
  97. /// - Returns: A factory for making and configuring fake transport.
  98. internal static func fake<Request: GRPCPayload, Response: GRPCPayload>(
  99. _ fakeResponse: _FakeResponseStream<Request, Response>?
  100. ) -> ClientTransportFactory<Request, Response> {
  101. let factory = FakeClientTransportFactory(
  102. fakeResponse,
  103. requestSerializer: GRPCPayloadSerializer(),
  104. requestDeserializer: GRPCPayloadDeserializer(),
  105. responseSerializer: GRPCPayloadSerializer(),
  106. responseDeserializer: GRPCPayloadDeserializer()
  107. )
  108. return .init(factory)
  109. }
  110. /// Makes a configured `ClientTransport`.
  111. /// - Parameters:
  112. /// - path: The path of the RPC, e.g. "/echo.Echo/Get".
  113. /// - type: The type of RPC, e.g. `.unary`.
  114. /// - options: Options for the RPC.
  115. /// - interceptors: Interceptors to use for the RPC.
  116. /// - onError: A callback invoked when an error is received.
  117. /// - onResponsePart: A closure called for each response part received.
  118. /// - Returns: A configured transport.
  119. internal func makeConfiguredTransport(
  120. to path: String,
  121. for type: GRPCCallType,
  122. withOptions options: CallOptions,
  123. onEventLoop eventLoop: EventLoop,
  124. interceptedBy interceptors: [ClientInterceptor<Request, Response>],
  125. onError: @escaping (Error) -> Void,
  126. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  127. ) -> ClientTransport<Request, Response> {
  128. switch self.factory {
  129. case let .http2(factory):
  130. let transport = factory.makeTransport(
  131. to: path,
  132. for: type,
  133. withOptions: options,
  134. onEventLoop: eventLoop,
  135. interceptedBy: interceptors,
  136. onError: onError,
  137. onResponsePart: onResponsePart
  138. )
  139. factory.configure(transport)
  140. return transport
  141. case let .fake(factory):
  142. let transport = factory.makeTransport(
  143. to: path,
  144. for: type,
  145. withOptions: options,
  146. onEventLoop: eventLoop,
  147. interceptedBy: interceptors,
  148. onError: onError,
  149. onResponsePart
  150. )
  151. factory.configure(transport)
  152. return transport
  153. }
  154. }
  155. }
  156. private struct HTTP2ClientTransportFactory<Request, Response> {
  157. /// The multiplexer providing an HTTP/2 stream for the call.
  158. private var multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
  159. /// The ":authority" pseudo-header.
  160. private var authority: String
  161. /// The ":scheme" pseudo-header.
  162. private var scheme: String
  163. /// An error delegate.
  164. private var errorDelegate: ClientErrorDelegate?
  165. /// The request serializer.
  166. private let serializer: AnySerializer<Request>
  167. /// The response deserializer.
  168. private let deserializer: AnyDeserializer<Response>
  169. fileprivate init<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  170. multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
  171. scheme: String,
  172. authority: String,
  173. serializer: Serializer,
  174. deserializer: Deserializer,
  175. errorDelegate: ClientErrorDelegate?
  176. ) where Serializer.Input == Request, Deserializer.Output == Response {
  177. self.multiplexer = multiplexer
  178. self.scheme = scheme
  179. self.authority = authority
  180. self.serializer = AnySerializer(wrapping: serializer)
  181. self.deserializer = AnyDeserializer(wrapping: deserializer)
  182. self.errorDelegate = errorDelegate
  183. }
  184. fileprivate func makeTransport(
  185. to path: String,
  186. for type: GRPCCallType,
  187. withOptions options: CallOptions,
  188. onEventLoop eventLoop: EventLoop,
  189. interceptedBy interceptors: [ClientInterceptor<Request, Response>],
  190. onError: @escaping (Error) -> Void,
  191. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  192. ) -> ClientTransport<Request, Response> {
  193. return ClientTransport(
  194. details: self.makeCallDetails(type: type, path: path, options: options),
  195. eventLoop: eventLoop,
  196. interceptors: interceptors,
  197. serializer: self.serializer,
  198. deserializer: self.deserializer,
  199. errorDelegate: self.errorDelegate,
  200. onError: onError,
  201. onResponsePart: onResponsePart
  202. )
  203. }
  204. fileprivate func configure<Request, Response>(_ transport: ClientTransport<Request, Response>) {
  205. transport.configure { _ in
  206. self.multiplexer.flatMap { multiplexer in
  207. let streamPromise = self.multiplexer.eventLoop.makePromise(of: Channel.self)
  208. multiplexer.createStreamChannel(promise: streamPromise) { streamChannel in
  209. // This initializer will always occur on the appropriate event loop, sync operations are
  210. // fine here.
  211. let syncOperations = streamChannel.pipeline.syncOperations
  212. do {
  213. let clientHandler = GRPCClientChannelHandler(
  214. callType: transport.callDetails.type,
  215. logger: transport.logger
  216. )
  217. try syncOperations.addHandler(clientHandler)
  218. try syncOperations.addHandler(transport)
  219. } catch {
  220. return streamChannel.eventLoop.makeFailedFuture(error)
  221. }
  222. return streamChannel.eventLoop.makeSucceededVoidFuture()
  223. }
  224. // We don't need the stream, but we do need to know it was correctly configured.
  225. return streamPromise.futureResult.map { _ in }
  226. }
  227. }
  228. }
  229. private func makeCallDetails(
  230. type: GRPCCallType,
  231. path: String,
  232. options: CallOptions
  233. ) -> CallDetails {
  234. return .init(
  235. type: type,
  236. path: path,
  237. authority: self.authority,
  238. scheme: self.scheme,
  239. options: options
  240. )
  241. }
  242. }
  243. private struct FakeClientTransportFactory<Request, Response> {
  244. /// The fake response stream for the call. This can be `nil` if the user did not correctly
  245. /// configure their client. The result will be a transport which immediately fails.
  246. private var fakeResponseStream: _FakeResponseStream<Request, Response>?
  247. /// The request serializer.
  248. private let requestSerializer: AnySerializer<Request>
  249. /// The response deserializer.
  250. private let responseDeserializer: AnyDeserializer<Response>
  251. /// A codec for deserializing requests and serializing responses.
  252. private let codec: ChannelHandler
  253. fileprivate init<
  254. RequestSerializer: MessageSerializer,
  255. RequestDeserializer: MessageDeserializer,
  256. ResponseSerializer: MessageSerializer,
  257. ResponseDeserializer: MessageDeserializer
  258. >(
  259. _ fakeResponseStream: _FakeResponseStream<Request, Response>?,
  260. requestSerializer: RequestSerializer,
  261. requestDeserializer: RequestDeserializer,
  262. responseSerializer: ResponseSerializer,
  263. responseDeserializer: ResponseDeserializer
  264. ) where RequestSerializer.Input == Request,
  265. RequestDeserializer.Output == Request,
  266. ResponseSerializer.Input == Response,
  267. ResponseDeserializer.Output == Response
  268. {
  269. self.fakeResponseStream = fakeResponseStream
  270. self.requestSerializer = AnySerializer(wrapping: requestSerializer)
  271. self.responseDeserializer = AnyDeserializer(wrapping: responseDeserializer)
  272. self.codec = GRPCClientReverseCodecHandler(
  273. serializer: responseSerializer,
  274. deserializer: requestDeserializer
  275. )
  276. }
  277. fileprivate func makeTransport(
  278. to path: String,
  279. for type: GRPCCallType,
  280. withOptions options: CallOptions,
  281. onEventLoop eventLoop: EventLoop,
  282. interceptedBy interceptors: [ClientInterceptor<Request, Response>],
  283. onError: @escaping (Error) -> Void,
  284. _ onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  285. ) -> ClientTransport<Request, Response> {
  286. return ClientTransport(
  287. details: CallDetails(
  288. type: type,
  289. path: path,
  290. authority: "localhost",
  291. scheme: "http",
  292. options: options
  293. ),
  294. eventLoop: eventLoop,
  295. interceptors: interceptors,
  296. serializer: self.requestSerializer,
  297. deserializer: self.responseDeserializer,
  298. errorDelegate: nil,
  299. onError: onError,
  300. onResponsePart: onResponsePart
  301. )
  302. }
  303. fileprivate func configure<Request, Response>(_ transport: ClientTransport<Request, Response>) {
  304. transport.configure { handler in
  305. if let fakeResponse = self.fakeResponseStream {
  306. return fakeResponse.channel.pipeline.addHandlers(self.codec, handler).always { result in
  307. switch result {
  308. case .success:
  309. fakeResponse.activate()
  310. case .failure:
  311. ()
  312. }
  313. }
  314. } else {
  315. return transport.callEventLoop
  316. .makeFailedFuture(GRPCStatus(code: .unavailable, message: nil))
  317. }
  318. }
  319. }
  320. }