ClientTransportFactory.swift 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. import NIOHTTP2
  18. import protocol SwiftProtobuf.Message
  19. /// A `ClientTransport` factory for an RPC.
  20. @usableFromInline
  21. internal struct ClientTransportFactory<Request, Response> {
  22. /// The underlying transport factory.
  23. private var factory: Factory
  24. @usableFromInline
  25. internal enum Factory {
  26. case http2(HTTP2ClientTransportFactory<Request, Response>)
  27. case fake(FakeClientTransportFactory<Request, Response>)
  28. }
  29. private init(_ http2: HTTP2ClientTransportFactory<Request, Response>) {
  30. self.factory = .http2(http2)
  31. }
  32. private init(_ fake: FakeClientTransportFactory<Request, Response>) {
  33. self.factory = .fake(fake)
  34. }
  35. /// Create a transport factory for HTTP/2 based transport with `SwiftProtobuf.Message` messages.
  36. /// - Parameters:
  37. /// - multiplexer: The multiplexer used to create an HTTP/2 stream for the RPC.
  38. /// - host: The value of the ":authority" pseudo header.
  39. /// - scheme: The value of the ":scheme" pseudo header.
  40. /// - errorDelegate: A client error delegate.
  41. /// - Returns: A factory for making and configuring HTTP/2 based transport.
  42. @usableFromInline
  43. internal static func http2(
  44. channel: EventLoopFuture<Channel>,
  45. authority: String,
  46. scheme: String,
  47. maximumReceiveMessageLength: Int,
  48. errorDelegate: ClientErrorDelegate?
  49. ) -> ClientTransportFactory<Request, Response> where Request: SwiftProtobuf.Message,
  50. Response: SwiftProtobuf.Message {
  51. let http2 = HTTP2ClientTransportFactory<Request, Response>(
  52. streamChannel: channel,
  53. scheme: scheme,
  54. authority: authority,
  55. serializer: ProtobufSerializer(),
  56. deserializer: ProtobufDeserializer(),
  57. maximumReceiveMessageLength: maximumReceiveMessageLength,
  58. errorDelegate: errorDelegate
  59. )
  60. return .init(http2)
  61. }
  62. /// Create a transport factory for HTTP/2 based transport with `GRPCPayload` messages.
  63. /// - Parameters:
  64. /// - multiplexer: The multiplexer used to create an HTTP/2 stream for the RPC.
  65. /// - host: The value of the ":authority" pseudo header.
  66. /// - scheme: The value of the ":scheme" pseudo header.
  67. /// - errorDelegate: A client error delegate.
  68. /// - Returns: A factory for making and configuring HTTP/2 based transport.
  69. @usableFromInline
  70. internal static func http2(
  71. channel: EventLoopFuture<Channel>,
  72. authority: String,
  73. scheme: String,
  74. maximumReceiveMessageLength: Int,
  75. errorDelegate: ClientErrorDelegate?
  76. ) -> ClientTransportFactory<Request, Response> where Request: GRPCPayload, Response: GRPCPayload {
  77. let http2 = HTTP2ClientTransportFactory<Request, Response>(
  78. streamChannel: channel,
  79. scheme: scheme,
  80. authority: authority,
  81. serializer: AnySerializer(wrapping: GRPCPayloadSerializer()),
  82. deserializer: AnyDeserializer(wrapping: GRPCPayloadDeserializer()),
  83. maximumReceiveMessageLength: maximumReceiveMessageLength,
  84. errorDelegate: errorDelegate
  85. )
  86. return .init(http2)
  87. }
  88. /// Make a factory for 'fake' transport.
  89. /// - Parameter fakeResponse: The fake response stream.
  90. /// - Returns: A factory for making and configuring fake transport.
  91. @usableFromInline
  92. internal static func fake(
  93. _ fakeResponse: _FakeResponseStream<Request, Response>?
  94. ) -> ClientTransportFactory<Request, Response> where Request: SwiftProtobuf.Message,
  95. Response: SwiftProtobuf.Message {
  96. let factory = FakeClientTransportFactory(
  97. fakeResponse,
  98. requestSerializer: ProtobufSerializer(),
  99. requestDeserializer: ProtobufDeserializer(),
  100. responseSerializer: ProtobufSerializer(),
  101. responseDeserializer: ProtobufDeserializer()
  102. )
  103. return .init(factory)
  104. }
  105. /// Make a factory for 'fake' transport.
  106. /// - Parameter fakeResponse: The fake response stream.
  107. /// - Returns: A factory for making and configuring fake transport.
  108. @usableFromInline
  109. internal static func fake(
  110. _ fakeResponse: _FakeResponseStream<Request, Response>?
  111. ) -> ClientTransportFactory<Request, Response> where Request: GRPCPayload, Response: GRPCPayload {
  112. let factory = FakeClientTransportFactory(
  113. fakeResponse,
  114. requestSerializer: GRPCPayloadSerializer(),
  115. requestDeserializer: GRPCPayloadDeserializer(),
  116. responseSerializer: GRPCPayloadSerializer(),
  117. responseDeserializer: GRPCPayloadDeserializer()
  118. )
  119. return .init(factory)
  120. }
  121. /// Makes a configured `ClientTransport`.
  122. /// - Parameters:
  123. /// - path: The path of the RPC, e.g. "/echo.Echo/Get".
  124. /// - type: The type of RPC, e.g. `.unary`.
  125. /// - options: Options for the RPC.
  126. /// - interceptors: Interceptors to use for the RPC.
  127. /// - onError: A callback invoked when an error is received.
  128. /// - onResponsePart: A closure called for each response part received.
  129. /// - Returns: A configured transport.
  130. internal func makeConfiguredTransport(
  131. to path: String,
  132. for type: GRPCCallType,
  133. withOptions options: CallOptions,
  134. onEventLoop eventLoop: EventLoop,
  135. interceptedBy interceptors: [ClientInterceptor<Request, Response>],
  136. onStart: @escaping () -> Void,
  137. onError: @escaping (Error) -> Void,
  138. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  139. ) -> ClientTransport<Request, Response> {
  140. switch self.factory {
  141. case let .http2(factory):
  142. let transport = factory.makeTransport(
  143. to: path,
  144. for: type,
  145. withOptions: options,
  146. onEventLoop: eventLoop,
  147. interceptedBy: interceptors,
  148. onStart: onStart,
  149. onError: onError,
  150. onResponsePart: onResponsePart
  151. )
  152. factory.configure(transport)
  153. return transport
  154. case let .fake(factory):
  155. let transport = factory.makeTransport(
  156. to: path,
  157. for: type,
  158. withOptions: options,
  159. onEventLoop: eventLoop,
  160. interceptedBy: interceptors,
  161. onError: onError,
  162. onResponsePart
  163. )
  164. factory.configure(transport)
  165. return transport
  166. }
  167. }
  168. }
  169. @usableFromInline
  170. internal struct HTTP2ClientTransportFactory<Request, Response> {
  171. /// The multiplexer providing an HTTP/2 stream for the call.
  172. private var streamChannel: EventLoopFuture<Channel>
  173. /// The ":authority" pseudo-header.
  174. private var authority: String
  175. /// The ":scheme" pseudo-header.
  176. private var scheme: String
  177. /// An error delegate.
  178. private var errorDelegate: ClientErrorDelegate?
  179. /// The request serializer.
  180. private let serializer: AnySerializer<Request>
  181. /// The response deserializer.
  182. private let deserializer: AnyDeserializer<Response>
  183. /// Maximum allowed length of a received message.
  184. private let maximumReceiveMessageLength: Int
  185. @usableFromInline
  186. internal init<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  187. streamChannel: EventLoopFuture<Channel>,
  188. scheme: String,
  189. authority: String,
  190. serializer: Serializer,
  191. deserializer: Deserializer,
  192. maximumReceiveMessageLength: Int,
  193. errorDelegate: ClientErrorDelegate?
  194. ) where Serializer.Input == Request, Deserializer.Output == Response {
  195. self.streamChannel = streamChannel
  196. self.scheme = scheme
  197. self.authority = authority
  198. self.serializer = AnySerializer(wrapping: serializer)
  199. self.deserializer = AnyDeserializer(wrapping: deserializer)
  200. self.maximumReceiveMessageLength = maximumReceiveMessageLength
  201. self.errorDelegate = errorDelegate
  202. }
  203. fileprivate func makeTransport(
  204. to path: String,
  205. for type: GRPCCallType,
  206. withOptions options: CallOptions,
  207. onEventLoop eventLoop: EventLoop,
  208. interceptedBy interceptors: [ClientInterceptor<Request, Response>],
  209. onStart: @escaping () -> Void,
  210. onError: @escaping (Error) -> Void,
  211. onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  212. ) -> ClientTransport<Request, Response> {
  213. return ClientTransport(
  214. details: self.makeCallDetails(type: type, path: path, options: options),
  215. eventLoop: eventLoop,
  216. interceptors: interceptors,
  217. serializer: self.serializer,
  218. deserializer: self.deserializer,
  219. errorDelegate: self.errorDelegate,
  220. onStart: onStart,
  221. onError: onError,
  222. onResponsePart: onResponsePart
  223. )
  224. }
  225. fileprivate func configure(_ transport: ClientTransport<Request, Response>) {
  226. transport.configure { _ in
  227. self.streamChannel.flatMapThrowing { channel in
  228. // This initializer will always occur on the appropriate event loop, sync operations are
  229. // fine here.
  230. let syncOperations = channel.pipeline.syncOperations
  231. do {
  232. let clientHandler = GRPCClientChannelHandler(
  233. callType: transport.callDetails.type,
  234. maximumReceiveMessageLength: self.maximumReceiveMessageLength,
  235. logger: transport.logger
  236. )
  237. try syncOperations.addHandler(clientHandler)
  238. try syncOperations.addHandler(transport)
  239. }
  240. }
  241. }
  242. }
  243. private func makeCallDetails(
  244. type: GRPCCallType,
  245. path: String,
  246. options: CallOptions
  247. ) -> CallDetails {
  248. return .init(
  249. type: type,
  250. path: path,
  251. authority: self.authority,
  252. scheme: self.scheme,
  253. options: options
  254. )
  255. }
  256. }
  257. @usableFromInline
  258. internal struct FakeClientTransportFactory<Request, Response> {
  259. /// The fake response stream for the call. This can be `nil` if the user did not correctly
  260. /// configure their client. The result will be a transport which immediately fails.
  261. private var fakeResponseStream: _FakeResponseStream<Request, Response>?
  262. /// The request serializer.
  263. private let requestSerializer: AnySerializer<Request>
  264. /// The response deserializer.
  265. private let responseDeserializer: AnyDeserializer<Response>
  266. /// A codec for deserializing requests and serializing responses.
  267. private let codec: ChannelHandler
  268. @usableFromInline
  269. internal init<
  270. RequestSerializer: MessageSerializer,
  271. RequestDeserializer: MessageDeserializer,
  272. ResponseSerializer: MessageSerializer,
  273. ResponseDeserializer: MessageDeserializer
  274. >(
  275. _ fakeResponseStream: _FakeResponseStream<Request, Response>?,
  276. requestSerializer: RequestSerializer,
  277. requestDeserializer: RequestDeserializer,
  278. responseSerializer: ResponseSerializer,
  279. responseDeserializer: ResponseDeserializer
  280. ) where RequestSerializer.Input == Request,
  281. RequestDeserializer.Output == Request,
  282. ResponseSerializer.Input == Response,
  283. ResponseDeserializer.Output == Response {
  284. self.fakeResponseStream = fakeResponseStream
  285. self.requestSerializer = AnySerializer(wrapping: requestSerializer)
  286. self.responseDeserializer = AnyDeserializer(wrapping: responseDeserializer)
  287. self.codec = GRPCClientReverseCodecHandler(
  288. serializer: responseSerializer,
  289. deserializer: requestDeserializer
  290. )
  291. }
  292. fileprivate func makeTransport(
  293. to path: String,
  294. for type: GRPCCallType,
  295. withOptions options: CallOptions,
  296. onEventLoop eventLoop: EventLoop,
  297. interceptedBy interceptors: [ClientInterceptor<Request, Response>],
  298. onError: @escaping (Error) -> Void,
  299. _ onResponsePart: @escaping (GRPCClientResponsePart<Response>) -> Void
  300. ) -> ClientTransport<Request, Response> {
  301. return ClientTransport(
  302. details: CallDetails(
  303. type: type,
  304. path: path,
  305. authority: "localhost",
  306. scheme: "http",
  307. options: options
  308. ),
  309. eventLoop: eventLoop,
  310. interceptors: interceptors,
  311. serializer: self.requestSerializer,
  312. deserializer: self.responseDeserializer,
  313. errorDelegate: nil,
  314. onStart: {},
  315. onError: onError,
  316. onResponsePart: onResponsePart
  317. )
  318. }
  319. fileprivate func configure(_ transport: ClientTransport<Request, Response>) {
  320. transport.configure { handler in
  321. if let fakeResponse = self.fakeResponseStream {
  322. return fakeResponse.channel.pipeline.addHandlers(self.codec, handler).always { result in
  323. switch result {
  324. case .success:
  325. fakeResponse.activate()
  326. case .failure:
  327. ()
  328. }
  329. }
  330. } else {
  331. return transport.callEventLoop
  332. .makeFailedFuture(GRPCStatus(code: .unavailable, message: nil))
  333. }
  334. }
  335. }
  336. }