EmbeddedGRPCChannel.swift 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIO
  17. import NIOHTTP2
  18. import Logging
  19. import SwiftProtobuf
  20. // This is currently intended for internal testing only.
  21. class EmbeddedGRPCChannel: GRPCChannel {
  22. let embeddedChannel: EmbeddedChannel
  23. let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
  24. let logger: Logger
  25. let scheme: String
  26. let authority: String
  27. let errorDelegate: ClientErrorDelegate?
  28. func close() -> EventLoopFuture<Void> {
  29. return embeddedChannel.close()
  30. }
  31. var eventLoop: EventLoop {
  32. return self.embeddedChannel.eventLoop
  33. }
  34. init(errorDelegate: ClientErrorDelegate? = nil) {
  35. let embeddedChannel = EmbeddedChannel()
  36. self.embeddedChannel = embeddedChannel
  37. let logger = Logger(subsystem: .clientChannel)
  38. self.logger = logger
  39. self.multiplexer = embeddedChannel.configureGRPCClient(
  40. errorDelegate: errorDelegate,
  41. logger: logger
  42. ).flatMap {
  43. embeddedChannel.pipeline.handler(type: HTTP2StreamMultiplexer.self)
  44. }
  45. self.scheme = "http"
  46. self.authority = "localhost"
  47. self.errorDelegate = errorDelegate
  48. }
  49. private func makeRequestHead(path: String, options: CallOptions) -> _GRPCRequestHead {
  50. return _GRPCRequestHead(
  51. scheme: self.scheme,
  52. path: path,
  53. host: self.authority,
  54. requestID: options.requestIDProvider.requestID(),
  55. options: options
  56. )
  57. }
  58. internal func makeUnaryCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  59. path: String,
  60. request: Request,
  61. callOptions: CallOptions
  62. ) -> UnaryCall<Request, Response> {
  63. let call = UnaryCall<Request, Response>.makeOnHTTP2Stream(
  64. multiplexer: self.multiplexer,
  65. serializer: ProtobufSerializer(),
  66. deserializer: ProtobufDeserializer(),
  67. callOptions: callOptions,
  68. errorDelegate: self.errorDelegate,
  69. logger: self.logger
  70. )
  71. call.send(self.makeRequestHead(path: path, options: callOptions), request: request)
  72. return call
  73. }
  74. internal func makeClientStreamingCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  75. path: String,
  76. callOptions: CallOptions
  77. ) -> ClientStreamingCall<Request, Response> {
  78. let call = ClientStreamingCall<Request, Response>.makeOnHTTP2Stream(
  79. multiplexer: self.multiplexer,
  80. serializer: ProtobufSerializer(),
  81. deserializer: ProtobufDeserializer(),
  82. callOptions: callOptions,
  83. errorDelegate: self.errorDelegate,
  84. logger: self.logger
  85. )
  86. call.sendHead(self.makeRequestHead(path: path, options: callOptions))
  87. return call
  88. }
  89. internal func makeServerStreamingCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  90. path: String,
  91. request: Request,
  92. callOptions: CallOptions,
  93. handler: @escaping (Response) -> Void
  94. ) -> ServerStreamingCall<Request, Response> {
  95. let call = ServerStreamingCall<Request, Response>.makeOnHTTP2Stream(
  96. multiplexer: self.multiplexer,
  97. serializer: ProtobufSerializer(),
  98. deserializer: ProtobufDeserializer(),
  99. callOptions: callOptions,
  100. errorDelegate: self.errorDelegate,
  101. logger: self.logger,
  102. responseHandler: handler
  103. )
  104. call.send(self.makeRequestHead(path: path, options: callOptions), request: request)
  105. return call
  106. }
  107. internal func makeBidirectionalStreamingCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
  108. path: String,
  109. callOptions: CallOptions,
  110. handler: @escaping (Response) -> Void
  111. ) -> BidirectionalStreamingCall<Request, Response> {
  112. let call = BidirectionalStreamingCall<Request, Response>.makeOnHTTP2Stream(
  113. multiplexer: self.multiplexer,
  114. serializer: ProtobufSerializer(),
  115. deserializer: ProtobufDeserializer(),
  116. callOptions: callOptions,
  117. errorDelegate: self.errorDelegate,
  118. logger: self.logger,
  119. responseHandler: handler
  120. )
  121. call.sendHead(self.makeRequestHead(path: path, options: callOptions))
  122. return call
  123. }
  124. }
  125. extension EmbeddedGRPCChannel {
  126. // We need these to conform to `GRPCChannel`. This class is internal and only used for tests so
  127. // it's okay that they're unimplemented for now.
  128. internal func makeUnaryCall<Request: GRPCPayload, Response: GRPCPayload>(
  129. path: String,
  130. request: Request,
  131. callOptions: CallOptions
  132. ) -> UnaryCall<Request, Response> {
  133. fatalError("Not implemented")
  134. }
  135. internal func makeClientStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  136. path: String,
  137. callOptions: CallOptions
  138. ) -> ClientStreamingCall<Request, Response> {
  139. fatalError("Not implemented")
  140. }
  141. internal func makeServerStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  142. path: String,
  143. request: Request,
  144. callOptions: CallOptions,
  145. handler: @escaping (Response) -> Void
  146. ) -> ServerStreamingCall<Request, Response> {
  147. fatalError("Not implemented")
  148. }
  149. internal func makeBidirectionalStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
  150. path: String,
  151. callOptions: CallOptions,
  152. handler: @escaping (Response) -> Void
  153. ) -> BidirectionalStreamingCall<Request, Response> {
  154. fatalError("Not implemented")
  155. }
  156. }