HTTP2StatusCodeServer.swift 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. /*
  2. * Copyright 2024, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import GRPCHTTP2Core
  17. import NIOCore
  18. import NIOHPACK
  19. import NIOHTTP2
  20. import NIOPosix
  21. /// An HTTP/2 test server which only responds to request headers by sending response headers and
  22. /// then closing. Each stream will be closed with the ":status" set to the value of the
  23. /// "response-status" header field in the request headers.
  24. @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
  25. final class HTTP2StatusCodeServer {
  26. private let address: EventLoopPromise<GRPCHTTP2Core.SocketAddress.IPv4>
  27. private let eventLoopGroup: MultiThreadedEventLoopGroup
  28. var listeningAddress: GRPCHTTP2Core.SocketAddress.IPv4 {
  29. get async throws {
  30. try await self.address.futureResult.get()
  31. }
  32. }
  33. init() {
  34. self.eventLoopGroup = .singleton
  35. self.address = self.eventLoopGroup.next().makePromise()
  36. }
  37. func run() async throws {
  38. do {
  39. let channel = try await ServerBootstrap(group: .singletonMultiThreadedEventLoopGroup)
  40. .bind(host: "127.0.0.1", port: 0) { channel in
  41. channel.eventLoop.makeCompletedFuture {
  42. let sync = channel.pipeline.syncOperations
  43. let multiplexer = try sync.configureAsyncHTTP2Pipeline(mode: .server) { stream in
  44. stream.eventLoop.makeCompletedFuture {
  45. try NIOAsyncChannel<HTTP2Frame.FramePayload, HTTP2Frame.FramePayload>(
  46. wrappingChannelSynchronously: stream
  47. )
  48. }
  49. }
  50. let wrapped = try NIOAsyncChannel<HTTP2Frame, HTTP2Frame>(
  51. wrappingChannelSynchronously: channel
  52. )
  53. return (wrapped, multiplexer)
  54. }
  55. }
  56. let port = channel.channel.localAddress!.port!
  57. self.address.succeed(.init(host: "127.0.0.1", port: port))
  58. try await channel.executeThenClose { inbound in
  59. try await withThrowingTaskGroup(of: Void.self) { acceptedGroup in
  60. for try await (accepted, mux) in inbound {
  61. acceptedGroup.addTask {
  62. try await withThrowingTaskGroup(of: Void.self) { connectionGroup in
  63. // Run the connection.
  64. connectionGroup.addTask {
  65. try await accepted.executeThenClose { inbound, outbound in
  66. for try await _ in inbound {}
  67. }
  68. }
  69. // Consume the streams.
  70. for try await stream in mux.inbound {
  71. connectionGroup.addTask {
  72. try await stream.executeThenClose { inbound, outbound in
  73. do {
  74. for try await frame in inbound {
  75. switch frame {
  76. case .headers(let requestHeaders):
  77. if let status = requestHeaders.headers.first(name: "response-status") {
  78. let headers: HPACKHeaders = [":status": "\(status)"]
  79. try await outbound.write(
  80. .headers(.init(headers: headers, endStream: true))
  81. )
  82. }
  83. default:
  84. () // Ignore the others
  85. }
  86. }
  87. } catch {
  88. // Ignore errors
  89. }
  90. }
  91. }
  92. }
  93. }
  94. }
  95. }
  96. }
  97. }
  98. } catch {
  99. self.address.fail(error)
  100. }
  101. }
  102. }