ClientTransportFactory.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. import NIOHTTP2
  18. import protocol SwiftProtobuf.Message
  19. /// A `ClientTransport` factory for an RPC.
  20. @usableFromInline
  21. internal struct ClientTransportFactory<Request, Response> {
  22. /// The underlying transport factory.
  23. private var factory: Factory
  24. @usableFromInline
  25. internal enum Factory {
  26. case http2(HTTP2ClientTransportFactory<Request, Response>)
  27. case fake(FakeClientTransportFactory<Request, Response>)
  28. }
  29. private init(_ http2: HTTP2ClientTransportFactory<Request, Response>) {
  30. self.factory = .http2(http2)
  31. }
  32. private init(_ fake: FakeClientTransportFactory<Request, Response>) {
  33. self.factory = .fake(fake)
  34. }
  35. /// Create a transport factory for HTTP/2 based transport with `SwiftProtobuf.Message` messages.
  36. /// - Parameters:
  37. /// - multiplexer: The multiplexer used to create an HTTP/2 stream for the RPC.
  38. /// - host: The value of the ":authority" pseudo header.
  39. /// - scheme: The value of the ":scheme" pseudo header.
  40. /// - errorDelegate: A client error delegate.
  41. /// - Returns: A factory for making and configuring HTTP/2 based transport.
  42. @usableFromInline
  43. internal static func http2(
  44. channel: EventLoopFuture<Channel>,
  45. authority: String,
  46. scheme: String,
  47. maximumReceiveMessageLength: Int,
  48. errorDelegate: ClientErrorDelegate?
  49. ) -> ClientTransportFactory<Request, Response>
  50. where
  51. Request: SwiftProtobuf.Message,
  52. Response: SwiftProtobuf.Message
  53. {
  54. let http2 = HTTP2ClientTransportFactory<Request, Response>(
  55. streamChannel: channel,
  56. scheme: scheme,
  57. authority: authority,
  58. serializer: ProtobufSerializer(),
  59. deserializer: ProtobufDeserializer(),
  60. maximumReceiveMessageLength: maximumReceiveMessageLength,
  61. errorDelegate: errorDelegate
  62. )
  63. return .init(http2)
  64. }
  65. /// Create a transport factory for HTTP/2 based transport with `GRPCPayload` messages.
  66. /// - Parameters:
  67. /// - multiplexer: The multiplexer used to create an HTTP/2 stream for the RPC.
  68. /// - host: The value of the ":authority" pseudo header.
  69. /// - scheme: The value of the ":scheme" pseudo header.
  70. /// - errorDelegate: A client error delegate.
  71. /// - Returns: A factory for making and configuring HTTP/2 based transport.
  72. @usableFromInline
  73. internal static func http2(
  74. channel: EventLoopFuture<Channel>,
  75. authority: String,
  76. scheme: String,
  77. maximumReceiveMessageLength: Int,
  78. errorDelegate: ClientErrorDelegate?
  79. ) -> ClientTransportFactory<Request, Response> where Request: GRPCPayload, Response: GRPCPayload {
  80. let http2 = HTTP2ClientTransportFactory<Request, Response>(
  81. streamChannel: channel,
  82. scheme: scheme,
  83. authority: authority,
  84. serializer: AnySerializer(wrapping: GRPCPayloadSerializer()),
  85. deserializer: AnyDeserializer(wrapping: GRPCPayloadDeserializer()),
  86. maximumReceiveMessageLength: maximumReceiveMessageLength,
  87. errorDelegate: errorDelegate
  88. )
  89. return .init(http2)
  90. }
  91. /// Make a factory for 'fake' transport.
  92. /// - Parameter fakeResponse: The fake response stream.
  93. /// - Returns: A factory for making and configuring fake transport.
  94. @usableFromInline
  95. internal static func fake(
  96. _ fakeResponse: _FakeResponseStream<Request, Response>?
  97. ) -> ClientTransportFactory<Request, Response>
  98. where
  99. Request: SwiftProtobuf.Message,
  100. Response: SwiftProtobuf.Message
  101. {
  102. let factory = FakeClientTransportFactory(
  103. fakeResponse,
  104. requestSerializer: ProtobufSerializer(),
  105. requestDeserializer: ProtobufDeserializer(),
  106. responseSerializer: ProtobufSerializer(),
  107. responseDeserializer: ProtobufDeserializer()
  108. )
  109. return .init(factory)
  110. }
  111. /// Make a factory for 'fake' transport.
  112. /// - Parameter fakeResponse: The fake response stream.
  113. /// - Returns: A factory for making and configuring fake transport.
  114. @usableFromInline
  115. internal static func fake(
  116. _ fakeResponse: _FakeResponseStream<Request, Response>?
  117. ) -> ClientTransportFactory<Request, Response> where Request: GRPCPayload, Response: GRPCPayload {
  118. let factory = FakeClientTransportFactory(
  119. fakeResponse,
  120. requestSerializer: GRPCPayloadSerializer(),
  121. requestDeserializer: GRPCPayloadDeserializer(),
  122. responseSerializer: GRPCPayloadSerializer(),
  123. responseDeserializer: GRPCPayloadDeserializer()
  124. )
  125. return .init(factory)
  126. }
  127. /// Makes a configured `ClientTransport`.
  128. /// - Parameters:
  129. /// - path: The path of the RPC, e.g. "/echo.Echo/Get".
  130. /// - type: The type of RPC, e.g. `.unary`.
  131. /// - options: Options for the RPC.
  132. /// - interceptors: Interceptors to use for the RPC.
  133. /// - onError: A callback invoked when an error is received.
  134. /// - onResponsePart: A closure called for each response part received.
  135. /// - Returns: A configured transport.
  136. internal func makeConfiguredTransport(
  137. to path: String,
  138. for type: GRPCCallType,
  139. withOptions options: CallOptions,
  140. onEventLoop eventLoop: EventLoop,
  141. interceptedBy interceptors: [ClientInterceptor<Request, Response>],
  142. onStart: @escaping () -> Void,
  143. onError: @escaping (Error) -> Void,
  144. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  145. ) -> ClientTransport<Request, Response> {
  146. switch self.factory {
  147. case let .http2(factory):
  148. let transport = factory.makeTransport(
  149. to: path,
  150. for: type,
  151. withOptions: options,
  152. onEventLoop: eventLoop,
  153. interceptedBy: interceptors,
  154. onStart: onStart,
  155. onError: onError,
  156. onResponsePart: onResponsePart
  157. )
  158. factory.configure(transport)
  159. return transport
  160. case let .fake(factory):
  161. let transport = factory.makeTransport(
  162. to: path,
  163. for: type,
  164. withOptions: options,
  165. onEventLoop: eventLoop,
  166. interceptedBy: interceptors,
  167. onError: onError,
  168. onResponsePart
  169. )
  170. factory.configure(transport)
  171. return transport
  172. }
  173. }
  174. }
  175. @usableFromInline
  176. internal struct HTTP2ClientTransportFactory<Request, Response> {
  177. /// The multiplexer providing an HTTP/2 stream for the call.
  178. private var streamChannel: EventLoopFuture<Channel>
  179. /// The ":authority" pseudo-header.
  180. private var authority: String
  181. /// The ":scheme" pseudo-header.
  182. private var scheme: String
  183. /// An error delegate.
  184. private var errorDelegate: ClientErrorDelegate?
  185. /// The request serializer.
  186. private let serializer: AnySerializer<Request>
  187. /// The response deserializer.
  188. private let deserializer: AnyDeserializer<Response>
  189. /// Maximum allowed length of a received message.
  190. private let maximumReceiveMessageLength: Int
  191. @usableFromInline
  192. internal init<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  193. streamChannel: EventLoopFuture<Channel>,
  194. scheme: String,
  195. authority: String,
  196. serializer: Serializer,
  197. deserializer: Deserializer,
  198. maximumReceiveMessageLength: Int,
  199. errorDelegate: ClientErrorDelegate?
  200. ) where Serializer.Input == Request, Deserializer.Output == Response {
  201. self.streamChannel = streamChannel
  202. self.scheme = scheme
  203. self.authority = authority
  204. self.serializer = AnySerializer(wrapping: serializer)
  205. self.deserializer = AnyDeserializer(wrapping: deserializer)
  206. self.maximumReceiveMessageLength = maximumReceiveMessageLength
  207. self.errorDelegate = errorDelegate
  208. }
  209. fileprivate func makeTransport(
  210. to path: String,
  211. for type: GRPCCallType,
  212. withOptions options: CallOptions,
  213. onEventLoop eventLoop: EventLoop,
  214. interceptedBy interceptors: [ClientInterceptor<Request, Response>],
  215. onStart: @escaping () -> Void,
  216. onError: @escaping (Error) -> Void,
  217. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  218. ) -> ClientTransport<Request, Response> {
  219. return ClientTransport(
  220. details: self.makeCallDetails(type: type, path: path, options: options),
  221. eventLoop: eventLoop,
  222. interceptors: interceptors,
  223. serializer: self.serializer,
  224. deserializer: self.deserializer,
  225. errorDelegate: self.errorDelegate,
  226. onStart: onStart,
  227. onError: onError,
  228. onResponsePart: onResponsePart
  229. )
  230. }
  231. fileprivate func configure(_ transport: ClientTransport<Request, Response>) {
  232. transport.configure { _ in
  233. self.streamChannel.flatMapThrowing { channel in
  234. // This initializer will always occur on the appropriate event loop, sync operations are
  235. // fine here.
  236. let syncOperations = channel.pipeline.syncOperations
  237. do {
  238. let clientHandler = GRPCClientChannelHandler(
  239. callType: transport.callDetails.type,
  240. maximumReceiveMessageLength: self.maximumReceiveMessageLength,
  241. logger: transport.logger
  242. )
  243. try syncOperations.addHandler(clientHandler)
  244. try syncOperations.addHandler(transport)
  245. }
  246. }
  247. }
  248. }
  249. private func makeCallDetails(
  250. type: GRPCCallType,
  251. path: String,
  252. options: CallOptions
  253. ) -> CallDetails {
  254. return .init(
  255. type: type,
  256. path: path,
  257. authority: self.authority,
  258. scheme: self.scheme,
  259. options: options
  260. )
  261. }
  262. }
  263. @usableFromInline
  264. internal struct FakeClientTransportFactory<Request, Response> {
  265. /// The fake response stream for the call. This can be `nil` if the user did not correctly
  266. /// configure their client. The result will be a transport which immediately fails.
  267. private var fakeResponseStream: _FakeResponseStream<Request, Response>?
  268. /// The request serializer.
  269. private let requestSerializer: AnySerializer<Request>
  270. /// The response deserializer.
  271. private let responseDeserializer: AnyDeserializer<Response>
  272. /// A codec for deserializing requests and serializing responses.
  273. private let codec: ChannelHandler
  274. @usableFromInline
  275. internal init<
  276. RequestSerializer: MessageSerializer,
  277. RequestDeserializer: MessageDeserializer,
  278. ResponseSerializer: MessageSerializer,
  279. ResponseDeserializer: MessageDeserializer
  280. >(
  281. _ fakeResponseStream: _FakeResponseStream<Request, Response>?,
  282. requestSerializer: RequestSerializer,
  283. requestDeserializer: RequestDeserializer,
  284. responseSerializer: ResponseSerializer,
  285. responseDeserializer: ResponseDeserializer
  286. )
  287. where
  288. RequestSerializer.Input == Request,
  289. RequestDeserializer.Output == Request,
  290. ResponseSerializer.Input == Response,
  291. ResponseDeserializer.Output == Response
  292. {
  293. self.fakeResponseStream = fakeResponseStream
  294. self.requestSerializer = AnySerializer(wrapping: requestSerializer)
  295. self.responseDeserializer = AnyDeserializer(wrapping: responseDeserializer)
  296. self.codec = GRPCClientReverseCodecHandler(
  297. serializer: responseSerializer,
  298. deserializer: requestDeserializer
  299. )
  300. }
  301. fileprivate func makeTransport(
  302. to path: String,
  303. for type: GRPCCallType,
  304. withOptions options: CallOptions,
  305. onEventLoop eventLoop: EventLoop,
  306. interceptedBy interceptors: [ClientInterceptor<Request, Response>],
  307. onError: @escaping (Error) -> Void,
  308. _ onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  309. ) -> ClientTransport<Request, Response> {
  310. return ClientTransport(
  311. details: CallDetails(
  312. type: type,
  313. path: path,
  314. authority: "localhost",
  315. scheme: "http",
  316. options: options
  317. ),
  318. eventLoop: eventLoop,
  319. interceptors: interceptors,
  320. serializer: self.requestSerializer,
  321. deserializer: self.responseDeserializer,
  322. errorDelegate: nil,
  323. onStart: {},
  324. onError: onError,
  325. onResponsePart: onResponsePart
  326. )
  327. }
  328. fileprivate func configure(_ transport: ClientTransport<Request, Response>) {
  329. transport.configure { handler in
  330. if let fakeResponse = self.fakeResponseStream {
  331. return fakeResponse.channel.pipeline.addHandlers(self.codec, handler).always { result in
  332. switch result {
  333. case .success:
  334. fakeResponse.activate()
  335. case .failure:
  336. ()
  337. }
  338. }
  339. } else {
  340. return transport.callEventLoop
  341. .makeFailedFuture(GRPCStatus(code: .unavailable, message: nil))
  342. }
  343. }
  344. }
  345. }