ClientTransportFactory.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIO
  17. import NIOHTTP2
  18. import protocol SwiftProtobuf.Message
  19. /// A `ClientTransport` factory for an RPC.
  20. @usableFromInline
  21. internal struct ClientTransportFactory<Request, Response> {
  22. /// The underlying transport factory.
  23. private var factory: Factory<Request, Response>
  24. private enum Factory<Request, Response> {
  25. case http2(HTTP2ClientTransportFactory<Request, Response>)
  26. case fake(FakeClientTransportFactory<Request, Response>)
  27. }
  28. private init(_ http2: HTTP2ClientTransportFactory<Request, Response>) {
  29. self.factory = .http2(http2)
  30. }
  31. private init(_ fake: FakeClientTransportFactory<Request, Response>) {
  32. self.factory = .fake(fake)
  33. }
  34. /// Create a transport factory for HTTP/2 based transport with `SwiftProtobuf.Message` messages.
  35. /// - Parameters:
  36. /// - multiplexer: The multiplexer used to create an HTTP/2 stream for the RPC.
  37. /// - host: The value of the ":authority" pseudo header.
  38. /// - scheme: The value of the ":scheme" pseudo header.
  39. /// - errorDelegate: A client error delegate.
  40. /// - Returns: A factory for making and configuring HTTP/2 based transport.
  41. internal static func http2<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  42. multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
  43. authority: String,
  44. scheme: String,
  45. maximumReceiveMessageLength: Int,
  46. errorDelegate: ClientErrorDelegate?
  47. ) -> ClientTransportFactory<Request, Response> {
  48. let http2 = HTTP2ClientTransportFactory<Request, Response>(
  49. multiplexer: multiplexer,
  50. scheme: scheme,
  51. authority: authority,
  52. serializer: ProtobufSerializer(),
  53. deserializer: ProtobufDeserializer(),
  54. maximumReceiveMessageLength: maximumReceiveMessageLength,
  55. errorDelegate: errorDelegate
  56. )
  57. return .init(http2)
  58. }
  59. /// Create a transport factory for HTTP/2 based transport with `GRPCPayload` messages.
  60. /// - Parameters:
  61. /// - multiplexer: The multiplexer used to create an HTTP/2 stream for the RPC.
  62. /// - host: The value of the ":authority" pseudo header.
  63. /// - scheme: The value of the ":scheme" pseudo header.
  64. /// - errorDelegate: A client error delegate.
  65. /// - Returns: A factory for making and configuring HTTP/2 based transport.
  66. internal static func http2<Request: GRPCPayload, Response: GRPCPayload>(
  67. multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
  68. authority: String,
  69. scheme: String,
  70. maximumReceiveMessageLength: Int,
  71. errorDelegate: ClientErrorDelegate?
  72. ) -> ClientTransportFactory<Request, Response> {
  73. let http2 = HTTP2ClientTransportFactory<Request, Response>(
  74. multiplexer: multiplexer,
  75. scheme: scheme,
  76. authority: authority,
  77. serializer: AnySerializer(wrapping: GRPCPayloadSerializer()),
  78. deserializer: AnyDeserializer(wrapping: GRPCPayloadDeserializer()),
  79. maximumReceiveMessageLength: maximumReceiveMessageLength,
  80. errorDelegate: errorDelegate
  81. )
  82. return .init(http2)
  83. }
  84. /// Make a factory for 'fake' transport.
  85. /// - Parameter fakeResponse: The fake response stream.
  86. /// - Returns: A factory for making and configuring fake transport.
  87. internal static func fake<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  88. _ fakeResponse: _FakeResponseStream<Request, Response>?
  89. ) -> ClientTransportFactory<Request, Response> {
  90. let factory = FakeClientTransportFactory(
  91. fakeResponse,
  92. requestSerializer: ProtobufSerializer(),
  93. requestDeserializer: ProtobufDeserializer(),
  94. responseSerializer: ProtobufSerializer(),
  95. responseDeserializer: ProtobufDeserializer()
  96. )
  97. return .init(factory)
  98. }
  99. /// Make a factory for 'fake' transport.
  100. /// - Parameter fakeResponse: The fake response stream.
  101. /// - Returns: A factory for making and configuring fake transport.
  102. internal static func fake<Request: GRPCPayload, Response: GRPCPayload>(
  103. _ fakeResponse: _FakeResponseStream<Request, Response>?
  104. ) -> ClientTransportFactory<Request, Response> {
  105. let factory = FakeClientTransportFactory(
  106. fakeResponse,
  107. requestSerializer: GRPCPayloadSerializer(),
  108. requestDeserializer: GRPCPayloadDeserializer(),
  109. responseSerializer: GRPCPayloadSerializer(),
  110. responseDeserializer: GRPCPayloadDeserializer()
  111. )
  112. return .init(factory)
  113. }
  114. /// Makes a configured `ClientTransport`.
  115. /// - Parameters:
  116. /// - path: The path of the RPC, e.g. "/echo.Echo/Get".
  117. /// - type: The type of RPC, e.g. `.unary`.
  118. /// - options: Options for the RPC.
  119. /// - interceptors: Interceptors to use for the RPC.
  120. /// - onError: A callback invoked when an error is received.
  121. /// - onResponsePart: A closure called for each response part received.
  122. /// - Returns: A configured transport.
  123. internal func makeConfiguredTransport(
  124. to path: String,
  125. for type: GRPCCallType,
  126. withOptions options: CallOptions,
  127. onEventLoop eventLoop: EventLoop,
  128. interceptedBy interceptors: [ClientInterceptor<Request, Response>],
  129. onError: @escaping (Error) -> Void,
  130. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  131. ) -> ClientTransport<Request, Response> {
  132. switch self.factory {
  133. case let .http2(factory):
  134. let transport = factory.makeTransport(
  135. to: path,
  136. for: type,
  137. withOptions: options,
  138. onEventLoop: eventLoop,
  139. interceptedBy: interceptors,
  140. onError: onError,
  141. onResponsePart: onResponsePart
  142. )
  143. factory.configure(transport)
  144. return transport
  145. case let .fake(factory):
  146. let transport = factory.makeTransport(
  147. to: path,
  148. for: type,
  149. withOptions: options,
  150. onEventLoop: eventLoop,
  151. interceptedBy: interceptors,
  152. onError: onError,
  153. onResponsePart
  154. )
  155. factory.configure(transport)
  156. return transport
  157. }
  158. }
  159. }
  160. private struct HTTP2ClientTransportFactory<Request, Response> {
  161. /// The multiplexer providing an HTTP/2 stream for the call.
  162. private var multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
  163. /// The ":authority" pseudo-header.
  164. private var authority: String
  165. /// The ":scheme" pseudo-header.
  166. private var scheme: String
  167. /// An error delegate.
  168. private var errorDelegate: ClientErrorDelegate?
  169. /// The request serializer.
  170. private let serializer: AnySerializer<Request>
  171. /// The response deserializer.
  172. private let deserializer: AnyDeserializer<Response>
  173. /// Maximum allowed length of a received message.
  174. private let maximumReceiveMessageLength: Int
  175. fileprivate init<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  176. multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
  177. scheme: String,
  178. authority: String,
  179. serializer: Serializer,
  180. deserializer: Deserializer,
  181. maximumReceiveMessageLength: Int,
  182. errorDelegate: ClientErrorDelegate?
  183. ) where Serializer.Input == Request, Deserializer.Output == Response {
  184. self.multiplexer = multiplexer
  185. self.scheme = scheme
  186. self.authority = authority
  187. self.serializer = AnySerializer(wrapping: serializer)
  188. self.deserializer = AnyDeserializer(wrapping: deserializer)
  189. self.maximumReceiveMessageLength = maximumReceiveMessageLength
  190. self.errorDelegate = errorDelegate
  191. }
  192. fileprivate func makeTransport(
  193. to path: String,
  194. for type: GRPCCallType,
  195. withOptions options: CallOptions,
  196. onEventLoop eventLoop: EventLoop,
  197. interceptedBy interceptors: [ClientInterceptor<Request, Response>],
  198. onError: @escaping (Error) -> Void,
  199. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  200. ) -> ClientTransport<Request, Response> {
  201. return ClientTransport(
  202. details: self.makeCallDetails(type: type, path: path, options: options),
  203. eventLoop: eventLoop,
  204. interceptors: interceptors,
  205. serializer: self.serializer,
  206. deserializer: self.deserializer,
  207. errorDelegate: self.errorDelegate,
  208. onError: onError,
  209. onResponsePart: onResponsePart
  210. )
  211. }
  212. fileprivate func configure<Request, Response>(_ transport: ClientTransport<Request, Response>) {
  213. transport.configure { _ in
  214. self.multiplexer.flatMap { multiplexer in
  215. let streamPromise = self.multiplexer.eventLoop.makePromise(of: Channel.self)
  216. multiplexer.createStreamChannel(promise: streamPromise) { streamChannel in
  217. // This initializer will always occur on the appropriate event loop, sync operations are
  218. // fine here.
  219. let syncOperations = streamChannel.pipeline.syncOperations
  220. do {
  221. let clientHandler = GRPCClientChannelHandler(
  222. callType: transport.callDetails.type,
  223. maximumReceiveMessageLength: self.maximumReceiveMessageLength,
  224. logger: transport.logger
  225. )
  226. try syncOperations.addHandler(clientHandler)
  227. try syncOperations.addHandler(transport)
  228. } catch {
  229. return streamChannel.eventLoop.makeFailedFuture(error)
  230. }
  231. return streamChannel.eventLoop.makeSucceededVoidFuture()
  232. }
  233. // We don't need the stream, but we do need to know it was correctly configured.
  234. return streamPromise.futureResult.map { _ in }
  235. }
  236. }
  237. }
  238. private func makeCallDetails(
  239. type: GRPCCallType,
  240. path: String,
  241. options: CallOptions
  242. ) -> CallDetails {
  243. return .init(
  244. type: type,
  245. path: path,
  246. authority: self.authority,
  247. scheme: self.scheme,
  248. options: options
  249. )
  250. }
  251. }
  252. private struct FakeClientTransportFactory<Request, Response> {
  253. /// The fake response stream for the call. This can be `nil` if the user did not correctly
  254. /// configure their client. The result will be a transport which immediately fails.
  255. private var fakeResponseStream: _FakeResponseStream<Request, Response>?
  256. /// The request serializer.
  257. private let requestSerializer: AnySerializer<Request>
  258. /// The response deserializer.
  259. private let responseDeserializer: AnyDeserializer<Response>
  260. /// A codec for deserializing requests and serializing responses.
  261. private let codec: ChannelHandler
  262. fileprivate init<
  263. RequestSerializer: MessageSerializer,
  264. RequestDeserializer: MessageDeserializer,
  265. ResponseSerializer: MessageSerializer,
  266. ResponseDeserializer: MessageDeserializer
  267. >(
  268. _ fakeResponseStream: _FakeResponseStream<Request, Response>?,
  269. requestSerializer: RequestSerializer,
  270. requestDeserializer: RequestDeserializer,
  271. responseSerializer: ResponseSerializer,
  272. responseDeserializer: ResponseDeserializer
  273. ) where RequestSerializer.Input == Request,
  274. RequestDeserializer.Output == Request,
  275. ResponseSerializer.Input == Response,
  276. ResponseDeserializer.Output == Response
  277. {
  278. self.fakeResponseStream = fakeResponseStream
  279. self.requestSerializer = AnySerializer(wrapping: requestSerializer)
  280. self.responseDeserializer = AnyDeserializer(wrapping: responseDeserializer)
  281. self.codec = GRPCClientReverseCodecHandler(
  282. serializer: responseSerializer,
  283. deserializer: requestDeserializer
  284. )
  285. }
  286. fileprivate func makeTransport(
  287. to path: String,
  288. for type: GRPCCallType,
  289. withOptions options: CallOptions,
  290. onEventLoop eventLoop: EventLoop,
  291. interceptedBy interceptors: [ClientInterceptor<Request, Response>],
  292. onError: @escaping (Error) -> Void,
  293. _ onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  294. ) -> ClientTransport<Request, Response> {
  295. return ClientTransport(
  296. details: CallDetails(
  297. type: type,
  298. path: path,
  299. authority: "localhost",
  300. scheme: "http",
  301. options: options
  302. ),
  303. eventLoop: eventLoop,
  304. interceptors: interceptors,
  305. serializer: self.requestSerializer,
  306. deserializer: self.responseDeserializer,
  307. errorDelegate: nil,
  308. onError: onError,
  309. onResponsePart: onResponsePart
  310. )
  311. }
  312. fileprivate func configure<Request, Response>(_ transport: ClientTransport<Request, Response>) {
  313. transport.configure { handler in
  314. if let fakeResponse = self.fakeResponseStream {
  315. return fakeResponse.channel.pipeline.addHandlers(self.codec, handler).always { result in
  316. switch result {
  317. case .success:
  318. fakeResponse.activate()
  319. case .failure:
  320. ()
  321. }
  322. }
  323. } else {
  324. return transport.callEventLoop
  325. .makeFailedFuture(GRPCStatus(code: .unavailable, message: nil))
  326. }
  327. }
  328. }
  329. }