_GRPCClientCodecHandler.swift 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIO
  17. internal class GRPCClientCodecHandler<
  18. Serializer: MessageSerializer,
  19. Deserializer: MessageDeserializer
  20. > {
  21. /// The request serializer.
  22. private let serializer: Serializer
  23. /// The response deserializer.
  24. private let deserializer: Deserializer
  25. internal init(serializer: Serializer, deserializer: Deserializer) {
  26. self.serializer = serializer
  27. self.deserializer = deserializer
  28. }
  29. }
  30. extension GRPCClientCodecHandler: ChannelInboundHandler {
  31. typealias InboundIn = _RawGRPCClientResponsePart
  32. typealias InboundOut = _GRPCClientResponsePart<Deserializer.Output>
  33. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  34. switch self.unwrapInboundIn(data) {
  35. case let .initialMetadata(headers):
  36. context.fireChannelRead(self.wrapInboundOut(.initialMetadata(headers)))
  37. case let .message(messageContext):
  38. do {
  39. let response = try self.deserializer.deserialize(byteBuffer: messageContext.message)
  40. context
  41. .fireChannelRead(
  42. self
  43. .wrapInboundOut(.message(.init(response, compressed: messageContext.compressed)))
  44. )
  45. } catch {
  46. context.fireErrorCaught(error)
  47. }
  48. case let .trailingMetadata(trailers):
  49. context.fireChannelRead(self.wrapInboundOut(.trailingMetadata(trailers)))
  50. case let .status(status):
  51. context.fireChannelRead(self.wrapInboundOut(.status(status)))
  52. }
  53. }
  54. }
  55. extension GRPCClientCodecHandler: ChannelOutboundHandler {
  56. typealias OutboundIn = _GRPCClientRequestPart<Serializer.Input>
  57. typealias OutboundOut = _RawGRPCClientRequestPart
  58. internal func write(
  59. context: ChannelHandlerContext,
  60. data: NIOAny,
  61. promise: EventLoopPromise<Void>?
  62. ) {
  63. switch self.unwrapOutboundIn(data) {
  64. case let .head(head):
  65. context.write(self.wrapOutboundOut(.head(head)), promise: promise)
  66. case let .message(message):
  67. do {
  68. let serialized = try self.serializer.serialize(
  69. message.message,
  70. allocator: context.channel.allocator
  71. )
  72. context.write(
  73. self.wrapOutboundOut(.message(.init(serialized, compressed: message.compressed))),
  74. promise: promise
  75. )
  76. } catch {
  77. promise?.fail(error)
  78. context.fireErrorCaught(error)
  79. }
  80. case .end:
  81. context.write(self.wrapOutboundOut(.end), promise: promise)
  82. }
  83. }
  84. }
  85. // MARK: Reverse Codec
  86. internal class GRPCClientReverseCodecHandler<
  87. Serializer: MessageSerializer,
  88. Deserializer: MessageDeserializer
  89. > {
  90. /// The request serializer.
  91. private let serializer: Serializer
  92. /// The response deserializer.
  93. private let deserializer: Deserializer
  94. internal init(serializer: Serializer, deserializer: Deserializer) {
  95. self.serializer = serializer
  96. self.deserializer = deserializer
  97. }
  98. }
  99. extension GRPCClientReverseCodecHandler: ChannelInboundHandler {
  100. typealias InboundIn = _GRPCClientResponsePart<Serializer.Input>
  101. typealias InboundOut = _RawGRPCClientResponsePart
  102. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  103. switch self.unwrapInboundIn(data) {
  104. case let .initialMetadata(headers):
  105. context.fireChannelRead(self.wrapInboundOut(.initialMetadata(headers)))
  106. case let .message(messageContext):
  107. do {
  108. let response = try self.serializer.serialize(
  109. messageContext.message,
  110. allocator: context.channel.allocator
  111. )
  112. context.fireChannelRead(
  113. self.wrapInboundOut(.message(.init(response, compressed: messageContext.compressed)))
  114. )
  115. } catch {
  116. context.fireErrorCaught(error)
  117. }
  118. case let .trailingMetadata(trailers):
  119. context.fireChannelRead(self.wrapInboundOut(.trailingMetadata(trailers)))
  120. case let .status(status):
  121. context.fireChannelRead(self.wrapInboundOut(.status(status)))
  122. }
  123. }
  124. }
  125. extension GRPCClientReverseCodecHandler: ChannelOutboundHandler {
  126. typealias OutboundIn = _RawGRPCClientRequestPart
  127. typealias OutboundOut = _GRPCClientRequestPart<Deserializer.Output>
  128. internal func write(
  129. context: ChannelHandlerContext,
  130. data: NIOAny,
  131. promise: EventLoopPromise<Void>?
  132. ) {
  133. switch self.unwrapOutboundIn(data) {
  134. case let .head(head):
  135. context.write(self.wrapOutboundOut(.head(head)), promise: promise)
  136. case let .message(message):
  137. do {
  138. let deserialized = try self.deserializer.deserialize(byteBuffer: message.message)
  139. context.write(
  140. self.wrapOutboundOut(.message(.init(deserialized, compressed: message.compressed))),
  141. promise: promise
  142. )
  143. } catch {
  144. promise?.fail(error)
  145. context.fireErrorCaught(error)
  146. }
  147. case .end:
  148. context.write(self.wrapOutboundOut(.end), promise: promise)
  149. }
  150. }
  151. }