_GRPCClientCodecHandler.swift 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIO
  17. internal class GRPCClientCodecHandler<
  18. Serializer: MessageSerializer,
  19. Deserializer: MessageDeserializer
  20. > {
  21. /// The request serializer.
  22. private let serializer: Serializer
  23. /// The response deserializer.
  24. private let deserializer: Deserializer
  25. internal init(serializer: Serializer, deserializer: Deserializer) {
  26. self.serializer = serializer
  27. self.deserializer = deserializer
  28. }
  29. }
  30. extension GRPCClientCodecHandler: ChannelInboundHandler {
  31. typealias InboundIn = _RawGRPCClientResponsePart
  32. typealias InboundOut = _GRPCClientResponsePart<Deserializer.Output>
  33. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  34. switch self.unwrapInboundIn(data) {
  35. case let .initialMetadata(headers):
  36. context.fireChannelRead(self.wrapInboundOut(.initialMetadata(headers)))
  37. case let .message(messageContext):
  38. do {
  39. let response = try self.deserializer.deserialize(byteBuffer: messageContext.message)
  40. context
  41. .fireChannelRead(
  42. self
  43. .wrapInboundOut(.message(.init(response, compressed: messageContext.compressed)))
  44. )
  45. } catch {
  46. context.fireErrorCaught(error)
  47. }
  48. case let .trailingMetadata(trailers):
  49. context.fireChannelRead(self.wrapInboundOut(.trailingMetadata(trailers)))
  50. case let .status(status):
  51. context.fireChannelRead(self.wrapInboundOut(.status(status)))
  52. }
  53. }
  54. }
  55. extension GRPCClientCodecHandler: ChannelOutboundHandler {
  56. typealias OutboundIn = _GRPCClientRequestPart<Serializer.Input>
  57. typealias OutboundOut = _RawGRPCClientRequestPart
  58. internal func write(
  59. context: ChannelHandlerContext,
  60. data: NIOAny,
  61. promise: EventLoopPromise<Void>?
  62. ) {
  63. switch self.unwrapOutboundIn(data) {
  64. case let .head(head):
  65. context.write(self.wrapOutboundOut(.head(head)), promise: promise)
  66. case let .message(message):
  67. do {
  68. let serialized = try self.serializer.serialize(
  69. message.message,
  70. allocator: context.channel.allocator
  71. )
  72. context.write(
  73. self.wrapOutboundOut(.message(.init(serialized, compressed: message.compressed))),
  74. promise: promise
  75. )
  76. } catch {
  77. promise?.fail(error)
  78. context.fireErrorCaught(error)
  79. }
  80. case .end:
  81. context.write(self.wrapOutboundOut(.end), promise: promise)
  82. }
  83. }
  84. }