EmbeddedGRPCChannel.swift 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHTTP2
  19. import SwiftProtobuf
  20. // This is currently intended for internal testing only.
  21. class EmbeddedGRPCChannel: GRPCChannel {
  22. let embeddedChannel: EmbeddedChannel
  23. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
  24. let logger: Logger
  25. let scheme: String
  26. let authority: String
  27. let errorDelegate: ClientErrorDelegate?
  28. func close() -> EventLoopFuture<Void> {
  29. return self.embeddedChannel.close()
  30. }
  31. var eventLoop: EventLoop {
  32. return self.embeddedChannel.eventLoop
  33. }
  34. init(
  35. logger: Logger = Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }),
  36. errorDelegate: ClientErrorDelegate? = nil
  37. ) {
  38. let embeddedChannel = EmbeddedChannel()
  39. self.embeddedChannel = embeddedChannel
  40. self.logger = logger
  41. self.multiplexer = embeddedChannel.configureGRPCClient(
  42. errorDelegate: errorDelegate,
  43. logger: logger
  44. ).flatMap {
  45. embeddedChannel.pipeline.handler(type: HTTP2StreamMultiplexer.self)
  46. }
  47. self.scheme = "http"
  48. self.authority = "localhost"
  49. self.errorDelegate = errorDelegate
  50. }
  51. internal func makeCall<Request: Message, Response: Message>(
  52. path: String,
  53. type: GRPCCallType,
  54. callOptions: CallOptions,
  55. interceptors: [ClientInterceptor<Request, Response>]
  56. ) -> Call<Request, Response> {
  57. return Call(
  58. path: path,
  59. type: type,
  60. eventLoop: self.eventLoop,
  61. options: callOptions,
  62. interceptors: interceptors,
  63. transportFactory: .http2(
  64. multiplexer: self.multiplexer,
  65. authority: self.authority,
  66. scheme: self.scheme,
  67. errorDelegate: self.errorDelegate
  68. )
  69. )
  70. }
  71. internal func makeCall<Request: GRPCPayload, Response: GRPCPayload>(
  72. path: String,
  73. type: GRPCCallType,
  74. callOptions: CallOptions,
  75. interceptors: [ClientInterceptor<Request, Response>]
  76. ) -> Call<Request, Response> {
  77. return Call(
  78. path: path,
  79. type: type,
  80. eventLoop: self.eventLoop,
  81. options: callOptions,
  82. interceptors: interceptors,
  83. transportFactory: .http2(
  84. multiplexer: self.multiplexer,
  85. authority: self.authority,
  86. scheme: self.scheme,
  87. errorDelegate: self.errorDelegate
  88. )
  89. )
  90. }
  91. private func makeRequestHead(path: String, options: CallOptions) -> _GRPCRequestHead {
  92. return _GRPCRequestHead(
  93. scheme: self.scheme,
  94. path: path,
  95. host: self.authority,
  96. options: options,
  97. requestID: nil
  98. )
  99. }
  100. internal func makeUnaryCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  101. path: String,
  102. request: Request,
  103. callOptions: CallOptions
  104. ) -> UnaryCall<Request, Response> {
  105. let call = UnaryCall<Request, Response>.makeOnHTTP2Stream(
  106. multiplexer: self.multiplexer,
  107. serializer: ProtobufSerializer(),
  108. deserializer: ProtobufDeserializer(),
  109. callOptions: callOptions,
  110. errorDelegate: self.errorDelegate,
  111. logger: self.logger
  112. )
  113. call.send(self.makeRequestHead(path: path, options: callOptions), request: request)
  114. return call
  115. }
  116. internal func makeClientStreamingCall<
  117. Request: SwiftProtobuf.Message,
  118. Response: SwiftProtobuf.Message
  119. >(
  120. path: String,
  121. callOptions: CallOptions
  122. ) -> ClientStreamingCall<Request, Response> {
  123. let call = ClientStreamingCall<Request, Response>.makeOnHTTP2Stream(
  124. multiplexer: self.multiplexer,
  125. serializer: ProtobufSerializer(),
  126. deserializer: ProtobufDeserializer(),
  127. callOptions: callOptions,
  128. errorDelegate: self.errorDelegate,
  129. logger: self.logger
  130. )
  131. call.sendHead(self.makeRequestHead(path: path, options: callOptions))
  132. return call
  133. }
  134. internal func makeServerStreamingCall<
  135. Request: SwiftProtobuf.Message,
  136. Response: SwiftProtobuf.Message
  137. >(
  138. path: String,
  139. request: Request,
  140. callOptions: CallOptions,
  141. handler: @escaping (Response) -> Void
  142. ) -> ServerStreamingCall<Request, Response> {
  143. let call = ServerStreamingCall<Request, Response>.makeOnHTTP2Stream(
  144. multiplexer: self.multiplexer,
  145. serializer: ProtobufSerializer(),
  146. deserializer: ProtobufDeserializer(),
  147. callOptions: callOptions,
  148. errorDelegate: self.errorDelegate,
  149. logger: self.logger,
  150. responseHandler: handler
  151. )
  152. call.send(self.makeRequestHead(path: path, options: callOptions), request: request)
  153. return call
  154. }
  155. internal func makeBidirectionalStreamingCall<
  156. Request: SwiftProtobuf.Message,
  157. Response: SwiftProtobuf.Message
  158. >(
  159. path: String,
  160. callOptions: CallOptions,
  161. handler: @escaping (Response) -> Void
  162. ) -> BidirectionalStreamingCall<Request, Response> {
  163. let call = BidirectionalStreamingCall<Request, Response>.makeOnHTTP2Stream(
  164. multiplexer: self.multiplexer,
  165. serializer: ProtobufSerializer(),
  166. deserializer: ProtobufDeserializer(),
  167. callOptions: callOptions,
  168. errorDelegate: self.errorDelegate,
  169. logger: self.logger,
  170. responseHandler: handler
  171. )
  172. call.sendHead(self.makeRequestHead(path: path, options: callOptions))
  173. return call
  174. }
  175. }
  176. extension EmbeddedGRPCChannel {
  177. // We need these to conform to `GRPCChannel`. This class is internal and only used for tests so
  178. // it's okay that they're unimplemented for now.
  179. internal func makeUnaryCall<Request: GRPCPayload, Response: GRPCPayload>(
  180. path: String,
  181. request: Request,
  182. callOptions: CallOptions
  183. ) -> UnaryCall<Request, Response> {
  184. fatalError("Not implemented")
  185. }
  186. internal func makeClientStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  187. path: String,
  188. callOptions: CallOptions
  189. ) -> ClientStreamingCall<Request, Response> {
  190. fatalError("Not implemented")
  191. }
  192. internal func makeServerStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  193. path: String,
  194. request: Request,
  195. callOptions: CallOptions,
  196. handler: @escaping (Response) -> Void
  197. ) -> ServerStreamingCall<Request, Response> {
  198. fatalError("Not implemented")
  199. }
  200. internal func makeBidirectionalStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  201. path: String,
  202. callOptions: CallOptions,
  203. handler: @escaping (Response) -> Void
  204. ) -> BidirectionalStreamingCall<Request, Response> {
  205. fatalError("Not implemented")
  206. }
  207. }