ClientTransportFactory.swift 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. import NIOHTTP2
  18. import protocol SwiftProtobuf.Message
  19. /// A `ClientTransport` factory for an RPC.
  20. @usableFromInline
  21. internal struct ClientTransportFactory<Request, Response> {
  22. /// The underlying transport factory.
  23. private var factory: Factory<Request, Response>
  24. @usableFromInline
  25. internal enum Factory<Request, Response> {
  26. case http2(HTTP2ClientTransportFactory<Request, Response>)
  27. case fake(FakeClientTransportFactory<Request, Response>)
  28. }
  29. private init(_ http2: HTTP2ClientTransportFactory<Request, Response>) {
  30. self.factory = .http2(http2)
  31. }
  32. private init(_ fake: FakeClientTransportFactory<Request, Response>) {
  33. self.factory = .fake(fake)
  34. }
  35. /// Create a transport factory for HTTP/2 based transport with `SwiftProtobuf.Message` messages.
  36. /// - Parameters:
  37. /// - multiplexer: The multiplexer used to create an HTTP/2 stream for the RPC.
  38. /// - host: The value of the ":authority" pseudo header.
  39. /// - scheme: The value of the ":scheme" pseudo header.
  40. /// - errorDelegate: A client error delegate.
  41. /// - Returns: A factory for making and configuring HTTP/2 based transport.
  42. @usableFromInline
  43. internal static func http2<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  44. channel: EventLoopFuture<Channel>,
  45. authority: String,
  46. scheme: String,
  47. maximumReceiveMessageLength: Int,
  48. errorDelegate: ClientErrorDelegate?
  49. ) -> ClientTransportFactory<Request, Response> {
  50. let http2 = HTTP2ClientTransportFactory<Request, Response>(
  51. streamChannel: channel,
  52. scheme: scheme,
  53. authority: authority,
  54. serializer: ProtobufSerializer(),
  55. deserializer: ProtobufDeserializer(),
  56. maximumReceiveMessageLength: maximumReceiveMessageLength,
  57. errorDelegate: errorDelegate
  58. )
  59. return .init(http2)
  60. }
  61. /// Create a transport factory for HTTP/2 based transport with `GRPCPayload` messages.
  62. /// - Parameters:
  63. /// - multiplexer: The multiplexer used to create an HTTP/2 stream for the RPC.
  64. /// - host: The value of the ":authority" pseudo header.
  65. /// - scheme: The value of the ":scheme" pseudo header.
  66. /// - errorDelegate: A client error delegate.
  67. /// - Returns: A factory for making and configuring HTTP/2 based transport.
  68. @usableFromInline
  69. internal static func http2<Request: GRPCPayload, Response: GRPCPayload>(
  70. channel: EventLoopFuture<Channel>,
  71. authority: String,
  72. scheme: String,
  73. maximumReceiveMessageLength: Int,
  74. errorDelegate: ClientErrorDelegate?
  75. ) -> ClientTransportFactory<Request, Response> {
  76. let http2 = HTTP2ClientTransportFactory<Request, Response>(
  77. streamChannel: channel,
  78. scheme: scheme,
  79. authority: authority,
  80. serializer: AnySerializer(wrapping: GRPCPayloadSerializer()),
  81. deserializer: AnyDeserializer(wrapping: GRPCPayloadDeserializer()),
  82. maximumReceiveMessageLength: maximumReceiveMessageLength,
  83. errorDelegate: errorDelegate
  84. )
  85. return .init(http2)
  86. }
  87. /// Make a factory for 'fake' transport.
  88. /// - Parameter fakeResponse: The fake response stream.
  89. /// - Returns: A factory for making and configuring fake transport.
  90. @usableFromInline
  91. internal static func fake<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  92. _ fakeResponse: _FakeResponseStream<Request, Response>?
  93. ) -> ClientTransportFactory<Request, Response> {
  94. let factory = FakeClientTransportFactory(
  95. fakeResponse,
  96. requestSerializer: ProtobufSerializer(),
  97. requestDeserializer: ProtobufDeserializer(),
  98. responseSerializer: ProtobufSerializer(),
  99. responseDeserializer: ProtobufDeserializer()
  100. )
  101. return .init(factory)
  102. }
  103. /// Make a factory for 'fake' transport.
  104. /// - Parameter fakeResponse: The fake response stream.
  105. /// - Returns: A factory for making and configuring fake transport.
  106. @usableFromInline
  107. internal static func fake<Request: GRPCPayload, Response: GRPCPayload>(
  108. _ fakeResponse: _FakeResponseStream<Request, Response>?
  109. ) -> ClientTransportFactory<Request, Response> {
  110. let factory = FakeClientTransportFactory(
  111. fakeResponse,
  112. requestSerializer: GRPCPayloadSerializer(),
  113. requestDeserializer: GRPCPayloadDeserializer(),
  114. responseSerializer: GRPCPayloadSerializer(),
  115. responseDeserializer: GRPCPayloadDeserializer()
  116. )
  117. return .init(factory)
  118. }
  119. /// Makes a configured `ClientTransport`.
  120. /// - Parameters:
  121. /// - path: The path of the RPC, e.g. "/echo.Echo/Get".
  122. /// - type: The type of RPC, e.g. `.unary`.
  123. /// - options: Options for the RPC.
  124. /// - interceptors: Interceptors to use for the RPC.
  125. /// - onError: A callback invoked when an error is received.
  126. /// - onResponsePart: A closure called for each response part received.
  127. /// - Returns: A configured transport.
  128. internal func makeConfiguredTransport(
  129. to path: String,
  130. for type: GRPCCallType,
  131. withOptions options: CallOptions,
  132. onEventLoop eventLoop: EventLoop,
  133. interceptedBy interceptors: [ClientInterceptor<Request, Response>],
  134. onError: @escaping (Error) -> Void,
  135. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  136. ) -> ClientTransport<Request, Response> {
  137. switch self.factory {
  138. case let .http2(factory):
  139. let transport = factory.makeTransport(
  140. to: path,
  141. for: type,
  142. withOptions: options,
  143. onEventLoop: eventLoop,
  144. interceptedBy: interceptors,
  145. onError: onError,
  146. onResponsePart: onResponsePart
  147. )
  148. factory.configure(transport)
  149. return transport
  150. case let .fake(factory):
  151. let transport = factory.makeTransport(
  152. to: path,
  153. for: type,
  154. withOptions: options,
  155. onEventLoop: eventLoop,
  156. interceptedBy: interceptors,
  157. onError: onError,
  158. onResponsePart
  159. )
  160. factory.configure(transport)
  161. return transport
  162. }
  163. }
  164. }
  165. @usableFromInline
  166. internal struct HTTP2ClientTransportFactory<Request, Response> {
  167. /// The multiplexer providing an HTTP/2 stream for the call.
  168. private var streamChannel: EventLoopFuture<Channel>
  169. /// The ":authority" pseudo-header.
  170. private var authority: String
  171. /// The ":scheme" pseudo-header.
  172. private var scheme: String
  173. /// An error delegate.
  174. private var errorDelegate: ClientErrorDelegate?
  175. /// The request serializer.
  176. private let serializer: AnySerializer<Request>
  177. /// The response deserializer.
  178. private let deserializer: AnyDeserializer<Response>
  179. /// Maximum allowed length of a received message.
  180. private let maximumReceiveMessageLength: Int
  181. @usableFromInline
  182. internal init<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  183. streamChannel: EventLoopFuture<Channel>,
  184. scheme: String,
  185. authority: String,
  186. serializer: Serializer,
  187. deserializer: Deserializer,
  188. maximumReceiveMessageLength: Int,
  189. errorDelegate: ClientErrorDelegate?
  190. ) where Serializer.Input == Request, Deserializer.Output == Response {
  191. self.streamChannel = streamChannel
  192. self.scheme = scheme
  193. self.authority = authority
  194. self.serializer = AnySerializer(wrapping: serializer)
  195. self.deserializer = AnyDeserializer(wrapping: deserializer)
  196. self.maximumReceiveMessageLength = maximumReceiveMessageLength
  197. self.errorDelegate = errorDelegate
  198. }
  199. fileprivate func makeTransport(
  200. to path: String,
  201. for type: GRPCCallType,
  202. withOptions options: CallOptions,
  203. onEventLoop eventLoop: EventLoop,
  204. interceptedBy interceptors: [ClientInterceptor<Request, Response>],
  205. onError: @escaping (Error) -> Void,
  206. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  207. ) -> ClientTransport<Request, Response> {
  208. return ClientTransport(
  209. details: self.makeCallDetails(type: type, path: path, options: options),
  210. eventLoop: eventLoop,
  211. interceptors: interceptors,
  212. serializer: self.serializer,
  213. deserializer: self.deserializer,
  214. errorDelegate: self.errorDelegate,
  215. onError: onError,
  216. onResponsePart: onResponsePart
  217. )
  218. }
  219. fileprivate func configure<Request, Response>(_ transport: ClientTransport<Request, Response>) {
  220. transport.configure { _ in
  221. self.streamChannel.flatMapThrowing { channel in
  222. // This initializer will always occur on the appropriate event loop, sync operations are
  223. // fine here.
  224. let syncOperations = channel.pipeline.syncOperations
  225. do {
  226. let clientHandler = GRPCClientChannelHandler(
  227. callType: transport.callDetails.type,
  228. maximumReceiveMessageLength: self.maximumReceiveMessageLength,
  229. logger: transport.logger
  230. )
  231. try syncOperations.addHandler(clientHandler)
  232. try syncOperations.addHandler(transport)
  233. }
  234. }
  235. }
  236. }
  237. private func makeCallDetails(
  238. type: GRPCCallType,
  239. path: String,
  240. options: CallOptions
  241. ) -> CallDetails {
  242. return .init(
  243. type: type,
  244. path: path,
  245. authority: self.authority,
  246. scheme: self.scheme,
  247. options: options
  248. )
  249. }
  250. }
  251. @usableFromInline
  252. internal struct FakeClientTransportFactory<Request, Response> {
  253. /// The fake response stream for the call. This can be `nil` if the user did not correctly
  254. /// configure their client. The result will be a transport which immediately fails.
  255. private var fakeResponseStream: _FakeResponseStream<Request, Response>?
  256. /// The request serializer.
  257. private let requestSerializer: AnySerializer<Request>
  258. /// The response deserializer.
  259. private let responseDeserializer: AnyDeserializer<Response>
  260. /// A codec for deserializing requests and serializing responses.
  261. private let codec: ChannelHandler
  262. @usableFromInline
  263. internal init<
  264. RequestSerializer: MessageSerializer,
  265. RequestDeserializer: MessageDeserializer,
  266. ResponseSerializer: MessageSerializer,
  267. ResponseDeserializer: MessageDeserializer
  268. >(
  269. _ fakeResponseStream: _FakeResponseStream<Request, Response>?,
  270. requestSerializer: RequestSerializer,
  271. requestDeserializer: RequestDeserializer,
  272. responseSerializer: ResponseSerializer,
  273. responseDeserializer: ResponseDeserializer
  274. ) where RequestSerializer.Input == Request,
  275. RequestDeserializer.Output == Request,
  276. ResponseSerializer.Input == Response,
  277. ResponseDeserializer.Output == Response
  278. {
  279. self.fakeResponseStream = fakeResponseStream
  280. self.requestSerializer = AnySerializer(wrapping: requestSerializer)
  281. self.responseDeserializer = AnyDeserializer(wrapping: responseDeserializer)
  282. self.codec = GRPCClientReverseCodecHandler(
  283. serializer: responseSerializer,
  284. deserializer: requestDeserializer
  285. )
  286. }
  287. fileprivate func makeTransport(
  288. to path: String,
  289. for type: GRPCCallType,
  290. withOptions options: CallOptions,
  291. onEventLoop eventLoop: EventLoop,
  292. interceptedBy interceptors: [ClientInterceptor<Request, Response>],
  293. onError: @escaping (Error) -> Void,
  294. _ onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  295. ) -> ClientTransport<Request, Response> {
  296. return ClientTransport(
  297. details: CallDetails(
  298. type: type,
  299. path: path,
  300. authority: "localhost",
  301. scheme: "http",
  302. options: options
  303. ),
  304. eventLoop: eventLoop,
  305. interceptors: interceptors,
  306. serializer: self.requestSerializer,
  307. deserializer: self.responseDeserializer,
  308. errorDelegate: nil,
  309. onError: onError,
  310. onResponsePart: onResponsePart
  311. )
  312. }
  313. fileprivate func configure<Request, Response>(_ transport: ClientTransport<Request, Response>) {
  314. transport.configure { handler in
  315. if let fakeResponse = self.fakeResponseStream {
  316. return fakeResponse.channel.pipeline.addHandlers(self.codec, handler).always { result in
  317. switch result {
  318. case .success:
  319. fakeResponse.activate()
  320. case .failure:
  321. ()
  322. }
  323. }
  324. } else {
  325. return transport.callEventLoop
  326. .makeFailedFuture(GRPCStatus(code: .unavailable, message: nil))
  327. }
  328. }
  329. }
  330. }