HTTP1ToGRPCServerCodecTests.swift 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import EchoImplementation
  17. import EchoModel
  18. import Foundation
  19. @testable import GRPC
  20. import Logging
  21. import NIO
  22. import NIOHTTP1
  23. import XCTest
  24. /// A trivial channel handler that invokes a callback once, the first time it sees
  25. /// channelRead.
  26. final class OnFirstReadHandler: ChannelInboundHandler {
  27. typealias InboundIn = Any
  28. typealias InboundOut = Any
  29. private var callback: (() -> Void)?
  30. init(callback: @escaping () -> Void) {
  31. self.callback = callback
  32. }
  33. func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  34. context.fireChannelRead(data)
  35. if let callback = self.callback {
  36. self.callback = nil
  37. callback()
  38. }
  39. }
  40. }
  41. final class ErrorRecordingHandler: ChannelInboundHandler {
  42. typealias InboundIn = Any
  43. var errors: [Error] = []
  44. func errorCaught(context: ChannelHandlerContext, error: Error) {
  45. self.errors.append(error)
  46. context.fireErrorCaught(error)
  47. }
  48. }
  49. class HTTP1ToGRPCServerCodecTests: GRPCTestCase {
  50. var channel: EmbeddedChannel!
  51. override func setUp() {
  52. super.setUp()
  53. let handler = HTTP1ToGRPCServerCodec(encoding: .disabled, logger: self.logger)
  54. self.channel = EmbeddedChannel(handler: handler)
  55. }
  56. override func tearDown() {
  57. XCTAssertNoThrow(try self.channel.finish())
  58. super.tearDown()
  59. }
  60. func makeRequestHead() -> HTTPRequestHead {
  61. return HTTPRequestHead(
  62. version: .init(major: 2, minor: 0),
  63. method: .POST,
  64. uri: "/echo.Echo/Get"
  65. )
  66. }
  67. func testSingleMessageFromMultipleBodyParts() throws {
  68. XCTAssertNoThrow(
  69. try self.channel
  70. .writeInbound(HTTPServerRequestPart.head(self.makeRequestHead()))
  71. )
  72. let requestPart = try self.channel.readInbound(as: _RawGRPCServerRequestPart.self)
  73. switch requestPart {
  74. case .some(.head):
  75. ()
  76. default:
  77. XCTFail("Unexpected request part: \(String(describing: requestPart))")
  78. }
  79. // Write a message across multiple buffers.
  80. let message = Echo_EchoRequest.with { $0.text = String(repeating: "x", count: 42) }
  81. let data = try message.serializedData()
  82. // Split the payload into two parts.
  83. let halfIndex = data.count / 2
  84. let firstChunk = data[0 ..< halfIndex]
  85. let secondChunk = data[halfIndex...]
  86. // Frame the message; send it in 2 parts.
  87. var firstBuffer = self.channel.allocator.buffer(capacity: firstChunk.count + 5)
  88. firstBuffer.writeInteger(UInt8(0))
  89. firstBuffer.writeInteger(UInt32(data.count))
  90. firstBuffer.writeBytes(firstChunk)
  91. XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.body(firstBuffer)))
  92. var secondBuffer = self.channel.allocator.buffer(capacity: secondChunk.count)
  93. secondBuffer.writeBytes(secondChunk)
  94. XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.body(secondBuffer)))
  95. let messagePart = try self.channel.readInbound(as: _RawGRPCServerRequestPart.self)
  96. switch messagePart {
  97. case var .some(.message(buffer)):
  98. XCTAssertEqual(data, buffer.readData(length: buffer.readableBytes)!)
  99. default:
  100. XCTFail("Unexpected request part: \(String(describing: requestPart))")
  101. }
  102. }
  103. func testMultipleMessagesFromSingleBodyPart() throws {
  104. XCTAssertNoThrow(
  105. try self.channel
  106. .writeInbound(HTTPServerRequestPart.head(self.makeRequestHead()))
  107. )
  108. let requestPart = try self.channel.readInbound(as: _RawGRPCServerRequestPart.self)
  109. switch requestPart {
  110. case .some(.head):
  111. ()
  112. default:
  113. XCTFail("Unexpected request part: \(String(describing: requestPart))")
  114. }
  115. // Write three messages into a single body.
  116. var buffer = self.channel.allocator.buffer(capacity: 0)
  117. let serializedMessages: [Data] = try ["foo", "bar", "baz"].map { text in
  118. Echo_EchoRequest.with { $0.text = text }
  119. }.map { request in
  120. try request.serializedData()
  121. }
  122. for data in serializedMessages {
  123. buffer.writeInteger(UInt8(0))
  124. buffer.writeInteger(UInt32(data.count))
  125. buffer.writeBytes(data)
  126. }
  127. XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.body(buffer)))
  128. for message in serializedMessages {
  129. let requestPart = try self.channel.readInbound(as: _RawGRPCServerRequestPart.self)
  130. switch requestPart {
  131. case var .some(.message(buffer)):
  132. XCTAssertEqual(message, buffer.readData(length: buffer.readableBytes)!)
  133. default:
  134. XCTFail("Unexpected request part: \(String(describing: requestPart))")
  135. }
  136. }
  137. }
  138. func testReentrantMessageDelivery() throws {
  139. XCTAssertNoThrow(
  140. try self.channel
  141. .writeInbound(HTTPServerRequestPart.head(self.makeRequestHead()))
  142. )
  143. let requestPart = try self.channel.readInbound(as: _RawGRPCServerRequestPart.self)
  144. switch requestPart {
  145. case .some(.head):
  146. ()
  147. default:
  148. XCTFail("Unexpected request part: \(String(describing: requestPart))")
  149. }
  150. // Write three messages into a single body.
  151. var buffer = self.channel.allocator.buffer(capacity: 0)
  152. let serializedMessages: [Data] = try ["foo", "bar", "baz"].map { text in
  153. Echo_EchoRequest.with { $0.text = text }
  154. }.map { request in
  155. try request.serializedData()
  156. }
  157. for data in serializedMessages {
  158. buffer.writeInteger(UInt8(0))
  159. buffer.writeInteger(UInt32(data.count))
  160. buffer.writeBytes(data)
  161. }
  162. // Create an OnFirstReadHandler that will _also_ send the data when it sees the first read.
  163. // This is try! because it cannot throw.
  164. let onFirstRead = OnFirstReadHandler {
  165. try! self.channel.writeInbound(HTTPServerRequestPart.body(buffer))
  166. }
  167. XCTAssertNoThrow(try self.channel.pipeline.addHandler(onFirstRead).wait())
  168. // Now write.
  169. XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.body(buffer)))
  170. // This must not re-order messages.
  171. for message in [serializedMessages, serializedMessages].flatMap({ $0 }) {
  172. let requestPart = try self.channel.readInbound(as: _RawGRPCServerRequestPart.self)
  173. switch requestPart {
  174. case var .some(.message(buffer)):
  175. XCTAssertEqual(message, buffer.readData(length: buffer.readableBytes)!)
  176. default:
  177. XCTFail("Unexpected request part: \(String(describing: requestPart))")
  178. }
  179. }
  180. }
  181. func testErrorsOnlyHappenOnce() throws {
  182. XCTAssertNoThrow(
  183. try self.channel
  184. .writeInbound(HTTPServerRequestPart.head(self.makeRequestHead()))
  185. )
  186. let requestPart = try self.channel.readInbound(as: _RawGRPCServerRequestPart.self)
  187. switch requestPart {
  188. case .some(.head):
  189. ()
  190. default:
  191. XCTFail("Unexpected request part: \(String(describing: requestPart))")
  192. }
  193. // Write three messages into a single body.
  194. var buffer = self.channel.allocator.buffer(capacity: 0)
  195. let serializedMessages: [Data] = try ["foo", "bar", "baz"].map { text in
  196. Echo_EchoRequest.with { $0.text = text }
  197. }.map { request in
  198. try request.serializedData()
  199. }
  200. for data in serializedMessages {
  201. buffer.writeInteger(UInt8(0))
  202. buffer.writeInteger(UInt32(data.count))
  203. buffer.writeBytes(data)
  204. }
  205. // Create an OnFirstReadHandler that will _also_ send the data when it sees the first read.
  206. // This is try! because it cannot throw.
  207. let onFirstRead = OnFirstReadHandler {
  208. // Let's create a bad message: we'll turn on compression. We use two bytes here to deal with the fact that
  209. // in hitting the error we'll actually consume the first byte (whoops).
  210. var badBuffer = self.channel.allocator.buffer(capacity: 0)
  211. badBuffer.writeInteger(UInt8(1))
  212. badBuffer.writeInteger(UInt8(1))
  213. _ = try? self.channel.writeInbound(HTTPServerRequestPart.body(badBuffer))
  214. }
  215. let errorHandler = ErrorRecordingHandler()
  216. XCTAssertNoThrow(try self.channel.pipeline.addHandlers([onFirstRead, errorHandler]).wait())
  217. // Now write.
  218. XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.body(buffer)))
  219. // We should have seen the original three messages
  220. for message in serializedMessages {
  221. let requestPart = try self.channel.readInbound(as: _RawGRPCServerRequestPart.self)
  222. switch requestPart {
  223. case var .some(.message(buffer)):
  224. XCTAssertEqual(message, buffer.readData(length: buffer.readableBytes)!)
  225. default:
  226. XCTFail("Unexpected request part: \(String(describing: requestPart))")
  227. }
  228. }
  229. // We should have recorded only one error.
  230. XCTAssertEqual(errorHandler.errors.count, 1)
  231. }
  232. }