HTTP2StatusCodeServer.swift 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCHTTP2Core
  17. import NIOCore
  18. import NIOHPACK
  19. import NIOHTTP2
  20. import NIOPosix
  21. /// An HTTP/2 test server which only responds to request headers by sending response headers and
  22. /// then closing. Each stream will be closed with the ":status" set to the value of the
  23. /// "response-status" header field in the request headers.
  24. final class HTTP2StatusCodeServer {
  25. private let address: EventLoopPromise<GRPCHTTP2Core.SocketAddress.IPv4>
  26. private let eventLoopGroup: MultiThreadedEventLoopGroup
  27. var listeningAddress: GRPCHTTP2Core.SocketAddress.IPv4 {
  28. get async throws {
  29. try await self.address.futureResult.get()
  30. }
  31. }
  32. init() {
  33. self.eventLoopGroup = .singleton
  34. self.address = self.eventLoopGroup.next().makePromise()
  35. }
  36. func run() async throws {
  37. do {
  38. let channel = try await ServerBootstrap(group: .singletonMultiThreadedEventLoopGroup)
  39. .bind(host: "127.0.0.1", port: 0) { channel in
  40. channel.eventLoop.makeCompletedFuture {
  41. let sync = channel.pipeline.syncOperations
  42. let multiplexer = try sync.configureAsyncHTTP2Pipeline(mode: .server) { stream in
  43. stream.eventLoop.makeCompletedFuture {
  44. try NIOAsyncChannel<HTTP2Frame.FramePayload, HTTP2Frame.FramePayload>(
  45. wrappingChannelSynchronously: stream
  46. )
  47. }
  48. }
  49. let wrapped = try NIOAsyncChannel<HTTP2Frame, HTTP2Frame>(
  50. wrappingChannelSynchronously: channel
  51. )
  52. return (wrapped, multiplexer)
  53. }
  54. }
  55. let port = channel.channel.localAddress!.port!
  56. self.address.succeed(.init(host: "127.0.0.1", port: port))
  57. try await channel.executeThenClose { inbound in
  58. try await withThrowingTaskGroup(of: Void.self) { acceptedGroup in
  59. for try await (accepted, mux) in inbound {
  60. acceptedGroup.addTask {
  61. try await withThrowingTaskGroup(of: Void.self) { connectionGroup in
  62. // Run the connection.
  63. connectionGroup.addTask {
  64. try await accepted.executeThenClose { inbound, outbound in
  65. for try await _ in inbound {}
  66. }
  67. }
  68. // Consume the streams.
  69. for try await stream in mux.inbound {
  70. connectionGroup.addTask {
  71. try await stream.executeThenClose { inbound, outbound in
  72. do {
  73. for try await frame in inbound {
  74. switch frame {
  75. case .headers(let requestHeaders):
  76. if let status = requestHeaders.headers.first(name: "response-status") {
  77. let headers: HPACKHeaders = [":status": "\(status)"]
  78. try await outbound.write(
  79. .headers(.init(headers: headers, endStream: true))
  80. )
  81. }
  82. default:
  83. () // Ignore the others
  84. }
  85. }
  86. } catch {
  87. // Ignore errors
  88. }
  89. }
  90. }
  91. }
  92. }
  93. }
  94. }
  95. }
  96. }
  97. } catch {
  98. self.address.fail(error)
  99. }
  100. }
  101. }