EmbeddedGRPCChannel.swift 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Logging
  17. import NIO
  18. import NIOHTTP2
  19. import SwiftProtobuf
  20. // This is currently intended for internal testing only.
  21. class EmbeddedGRPCChannel: GRPCChannel {
  22. let embeddedChannel: EmbeddedChannel
  23. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
  24. let logger: Logger
  25. let scheme: String
  26. let authority: String
  27. let errorDelegate: ClientErrorDelegate?
  28. func close() -> EventLoopFuture<Void> {
  29. return self.embeddedChannel.close()
  30. }
  31. var eventLoop: EventLoop {
  32. return self.embeddedChannel.eventLoop
  33. }
  34. init(
  35. logger: Logger = Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }),
  36. errorDelegate: ClientErrorDelegate? = nil
  37. ) {
  38. let embeddedChannel = EmbeddedChannel()
  39. self.embeddedChannel = embeddedChannel
  40. self.logger = logger
  41. self.multiplexer = embeddedChannel.configureGRPCClient(
  42. errorDelegate: errorDelegate,
  43. logger: logger
  44. ).flatMap {
  45. embeddedChannel.pipeline.handler(type: HTTP2StreamMultiplexer.self)
  46. }
  47. self.scheme = "http"
  48. self.authority = "localhost"
  49. self.errorDelegate = errorDelegate
  50. }
  51. private func makeRequestHead(path: String, options: CallOptions) -> _GRPCRequestHead {
  52. return _GRPCRequestHead(
  53. scheme: self.scheme,
  54. path: path,
  55. host: self.authority,
  56. options: options,
  57. requestID: nil
  58. )
  59. }
  60. internal func makeUnaryCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  61. path: String,
  62. request: Request,
  63. callOptions: CallOptions
  64. ) -> UnaryCall<Request, Response> {
  65. let call = UnaryCall<Request, Response>.makeOnHTTP2Stream(
  66. multiplexer: self.multiplexer,
  67. serializer: ProtobufSerializer(),
  68. deserializer: ProtobufDeserializer(),
  69. callOptions: callOptions,
  70. errorDelegate: self.errorDelegate,
  71. logger: self.logger
  72. )
  73. call.send(self.makeRequestHead(path: path, options: callOptions), request: request)
  74. return call
  75. }
  76. internal func makeClientStreamingCall<
  77. Request: SwiftProtobuf.Message,
  78. Response: SwiftProtobuf.Message
  79. >(
  80. path: String,
  81. callOptions: CallOptions
  82. ) -> ClientStreamingCall<Request, Response> {
  83. let call = ClientStreamingCall<Request, Response>.makeOnHTTP2Stream(
  84. multiplexer: self.multiplexer,
  85. serializer: ProtobufSerializer(),
  86. deserializer: ProtobufDeserializer(),
  87. callOptions: callOptions,
  88. errorDelegate: self.errorDelegate,
  89. logger: self.logger
  90. )
  91. call.sendHead(self.makeRequestHead(path: path, options: callOptions))
  92. return call
  93. }
  94. internal func makeServerStreamingCall<
  95. Request: SwiftProtobuf.Message,
  96. Response: SwiftProtobuf.Message
  97. >(
  98. path: String,
  99. request: Request,
  100. callOptions: CallOptions,
  101. handler: @escaping (Response) -> Void
  102. ) -> ServerStreamingCall<Request, Response> {
  103. let call = ServerStreamingCall<Request, Response>.makeOnHTTP2Stream(
  104. multiplexer: self.multiplexer,
  105. serializer: ProtobufSerializer(),
  106. deserializer: ProtobufDeserializer(),
  107. callOptions: callOptions,
  108. errorDelegate: self.errorDelegate,
  109. logger: self.logger,
  110. responseHandler: handler
  111. )
  112. call.send(self.makeRequestHead(path: path, options: callOptions), request: request)
  113. return call
  114. }
  115. internal func makeBidirectionalStreamingCall<
  116. Request: SwiftProtobuf.Message,
  117. Response: SwiftProtobuf.Message
  118. >(
  119. path: String,
  120. callOptions: CallOptions,
  121. handler: @escaping (Response) -> Void
  122. ) -> BidirectionalStreamingCall<Request, Response> {
  123. let call = BidirectionalStreamingCall<Request, Response>.makeOnHTTP2Stream(
  124. multiplexer: self.multiplexer,
  125. serializer: ProtobufSerializer(),
  126. deserializer: ProtobufDeserializer(),
  127. callOptions: callOptions,
  128. errorDelegate: self.errorDelegate,
  129. logger: self.logger,
  130. responseHandler: handler
  131. )
  132. call.sendHead(self.makeRequestHead(path: path, options: callOptions))
  133. return call
  134. }
  135. }
  136. extension EmbeddedGRPCChannel {
  137. // We need these to conform to `GRPCChannel`. This class is internal and only used for tests so
  138. // it's okay that they're unimplemented for now.
  139. internal func makeUnaryCall<Request: GRPCPayload, Response: GRPCPayload>(
  140. path: String,
  141. request: Request,
  142. callOptions: CallOptions
  143. ) -> UnaryCall<Request, Response> {
  144. fatalError("Not implemented")
  145. }
  146. internal func makeClientStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  147. path: String,
  148. callOptions: CallOptions
  149. ) -> ClientStreamingCall<Request, Response> {
  150. fatalError("Not implemented")
  151. }
  152. internal func makeServerStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  153. path: String,
  154. request: Request,
  155. callOptions: CallOptions,
  156. handler: @escaping (Response) -> Void
  157. ) -> ServerStreamingCall<Request, Response> {
  158. fatalError("Not implemented")
  159. }
  160. internal func makeBidirectionalStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  161. path: String,
  162. callOptions: CallOptions,
  163. handler: @escaping (Response) -> Void
  164. ) -> BidirectionalStreamingCall<Request, Response> {
  165. fatalError("Not implemented")
  166. }
  167. }