GRPCWebToHTTP2ServerCodec.swift 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import struct Foundation.Data
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP1
  20. import NIOHTTP2
  21. /// A codec for translating between gRPC Web (as HTTP/1) and HTTP/2 frame payloads.
  22. internal final class GRPCWebToHTTP2ServerCodec: ChannelDuplexHandler {
  23. internal typealias InboundIn = HTTPServerRequestPart
  24. internal typealias InboundOut = HTTP2Frame.FramePayload
  25. internal typealias OutboundIn = HTTP2Frame.FramePayload
  26. internal typealias OutboundOut = HTTPServerResponsePart
  27. private var stateMachine: StateMachine
  28. /// Create a gRPC Web to server HTTP/2 codec.
  29. ///
  30. /// - Parameter scheme: The value of the ':scheme' pseudo header to insert when converting the
  31. /// request headers.
  32. init(scheme: String) {
  33. self.stateMachine = StateMachine(scheme: scheme)
  34. }
  35. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  36. let action = self.stateMachine.processInbound(
  37. serverRequestPart: self.unwrapInboundIn(data),
  38. allocator: context.channel.allocator
  39. )
  40. self.act(on: action, context: context)
  41. }
  42. internal func write(
  43. context: ChannelHandlerContext,
  44. data: NIOAny,
  45. promise: EventLoopPromise<Void>?
  46. ) {
  47. let action = self.stateMachine.processOutbound(
  48. framePayload: self.unwrapOutboundIn(data),
  49. promise: promise,
  50. allocator: context.channel.allocator
  51. )
  52. self.act(on: action, context: context)
  53. }
  54. /// Acts on an action returned by the state machine.
  55. private func act(on action: StateMachine.Action, context: ChannelHandlerContext) {
  56. switch action {
  57. case .none:
  58. ()
  59. case let .fireChannelRead(payload):
  60. context.fireChannelRead(self.wrapInboundOut(payload))
  61. case let .write(write):
  62. if let additionalPart = write.additionalPart {
  63. context.write(self.wrapOutboundOut(write.part), promise: nil)
  64. context.write(self.wrapOutboundOut(additionalPart), promise: write.promise)
  65. } else {
  66. context.write(self.wrapOutboundOut(write.part), promise: write.promise)
  67. }
  68. if write.closeChannel {
  69. context.close(mode: .all, promise: nil)
  70. }
  71. case let .completePromise(promise, result):
  72. promise?.completeWith(result)
  73. }
  74. }
  75. }
  76. extension GRPCWebToHTTP2ServerCodec {
  77. internal struct StateMachine {
  78. /// The current state.
  79. private var state: State
  80. private let scheme: String
  81. internal init(scheme: String) {
  82. self.state = .idle
  83. self.scheme = scheme
  84. }
  85. private mutating func withStateAvoidingCoWs(_ body: (inout State) -> Action) -> Action {
  86. var state: State = ._modifying
  87. swap(&self.state, &state)
  88. defer {
  89. swap(&self.state, &state)
  90. }
  91. return body(&state)
  92. }
  93. /// Process the inbound `HTTPServerRequestPart`.
  94. internal mutating func processInbound(
  95. serverRequestPart: HTTPServerRequestPart,
  96. allocator: ByteBufferAllocator
  97. ) -> Action {
  98. let scheme = self.scheme
  99. return self.withStateAvoidingCoWs { state in
  100. state.processInbound(
  101. serverRequestPart: serverRequestPart,
  102. scheme: scheme,
  103. allocator: allocator
  104. )
  105. }
  106. }
  107. /// Process the outbound `HTTP2Frame.FramePayload`.
  108. internal mutating func processOutbound(
  109. framePayload: HTTP2Frame.FramePayload,
  110. promise: EventLoopPromise<Void>?,
  111. allocator: ByteBufferAllocator
  112. ) -> Action {
  113. return self.withStateAvoidingCoWs { state in
  114. state.processOutbound(framePayload: framePayload, promise: promise, allocator: allocator)
  115. }
  116. }
  117. /// An action to take as a result of interaction with the state machine.
  118. internal enum Action {
  119. case none
  120. case fireChannelRead(HTTP2Frame.FramePayload)
  121. case write(Write)
  122. case completePromise(EventLoopPromise<Void>?, Result<Void, Error>)
  123. internal struct Write {
  124. internal var part: HTTPServerResponsePart
  125. internal var additionalPart: HTTPServerResponsePart?
  126. internal var promise: EventLoopPromise<Void>?
  127. internal var closeChannel: Bool
  128. internal init(
  129. part: HTTPServerResponsePart,
  130. additionalPart: HTTPServerResponsePart? = nil,
  131. promise: EventLoopPromise<Void>?,
  132. closeChannel: Bool
  133. ) {
  134. self.part = part
  135. self.additionalPart = additionalPart
  136. self.promise = promise
  137. self.closeChannel = closeChannel
  138. }
  139. }
  140. }
  141. fileprivate enum State {
  142. /// Idle; nothing has been received or sent. The only valid transition is to 'fullyOpen' when
  143. /// receiving request headers.
  144. case idle
  145. /// Received request headers. Waiting for the end of request and response streams.
  146. case fullyOpen(InboundState, OutboundState)
  147. /// The server has closed the response stream, we may receive other request parts from the client.
  148. case clientOpenServerClosed
  149. /// The client has sent everything, the server still needs to close the response stream.
  150. case clientClosedServerOpen(OutboundState)
  151. /// Not a real state.
  152. case _modifying
  153. }
  154. fileprivate struct InboundState {
  155. /// A `ByteBuffer` containing the base64 encoded bytes of the request stream if gRPC Web Text
  156. /// is being used, `nil` otherwise.
  157. var requestBuffer: ByteBuffer?
  158. init(isTextEncoded: Bool, allocator: ByteBufferAllocator) {
  159. self.requestBuffer = isTextEncoded ? allocator.buffer(capacity: 0) : nil
  160. }
  161. }
  162. fileprivate struct OutboundState {
  163. /// A `CircularBuffer` holding any response messages if gRPC Web Text is being used, `nil`
  164. /// otherwise.
  165. var responseBuffer: CircularBuffer<ByteBuffer>?
  166. /// True if the response headers have been sent.
  167. var responseHeadersSent: Bool
  168. /// True if the server should close the connection when this request is done.
  169. var closeConnection: Bool
  170. init(isTextEncoded: Bool, closeConnection: Bool) {
  171. self.responseHeadersSent = false
  172. self.responseBuffer = isTextEncoded ? CircularBuffer() : nil
  173. self.closeConnection = closeConnection
  174. }
  175. }
  176. }
  177. }
  178. extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
  179. fileprivate mutating func processInbound(
  180. serverRequestPart: HTTPServerRequestPart,
  181. scheme: String,
  182. allocator: ByteBufferAllocator
  183. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  184. switch serverRequestPart {
  185. case let .head(head):
  186. return self.processRequestHead(head, scheme: scheme, allocator: allocator)
  187. case var .body(buffer):
  188. return self.processRequestBody(&buffer)
  189. case .end:
  190. return self.processRequestEnd(allocator: allocator)
  191. }
  192. }
  193. fileprivate mutating func processOutbound(
  194. framePayload: HTTP2Frame.FramePayload,
  195. promise: EventLoopPromise<Void>?,
  196. allocator: ByteBufferAllocator
  197. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  198. switch framePayload {
  199. case let .headers(payload):
  200. return self.processResponseHeaders(payload, promise: promise, allocator: allocator)
  201. case let .data(payload):
  202. return self.processResponseData(payload, promise: promise)
  203. case .priority,
  204. .rstStream,
  205. .settings,
  206. .pushPromise,
  207. .ping,
  208. .goAway,
  209. .windowUpdate,
  210. .alternativeService,
  211. .origin:
  212. preconditionFailure("Unsupported frame payload")
  213. }
  214. }
  215. }
  216. // MARK: - Inbound
  217. extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
  218. private mutating func processRequestHead(
  219. _ head: HTTPRequestHead,
  220. scheme: String,
  221. allocator: ByteBufferAllocator
  222. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  223. switch self {
  224. case .idle:
  225. let normalized = HPACKHeaders(httpHeaders: head.headers, normalizeHTTPHeaders: true)
  226. // Regular headers need to come after the pseudo headers. Unfortunately, this means we need to
  227. // allocate a second headers block to use the normalization provided by NIO HTTP/2.
  228. //
  229. // TODO: Use API provided by https://github.com/apple/swift-nio-http2/issues/254 to avoid the
  230. // extra copy.
  231. var headers = HPACKHeaders()
  232. headers.reserveCapacity(normalized.count + 4)
  233. headers.add(name: ":path", value: head.uri)
  234. headers.add(name: ":method", value: head.method.rawValue)
  235. headers.add(name: ":scheme", value: scheme)
  236. if let host = head.headers.first(name: "host") {
  237. headers.add(name: ":authority", value: host)
  238. }
  239. headers.add(contentsOf: normalized)
  240. // Check whether we're dealing with gRPC Web Text. No need to fully validate the content-type
  241. // that will be done at the HTTP/2 level.
  242. let contentType = headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init)
  243. let isWebText = contentType == .some(.webTextProtobuf)
  244. let closeConnection = head.headers[canonicalForm: "connection"].contains("close")
  245. self = .fullyOpen(
  246. .init(isTextEncoded: isWebText, allocator: allocator),
  247. .init(isTextEncoded: isWebText, closeConnection: closeConnection)
  248. )
  249. return .fireChannelRead(.headers(.init(headers: headers)))
  250. case .fullyOpen, .clientOpenServerClosed, .clientClosedServerOpen:
  251. preconditionFailure("Invalid state: already received request head")
  252. case ._modifying:
  253. preconditionFailure("Left in modifying state")
  254. }
  255. }
  256. private mutating func processRequestBody(
  257. _ buffer: inout ByteBuffer
  258. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  259. switch self {
  260. case .idle:
  261. preconditionFailure("Invalid state: haven't received request head")
  262. case .fullyOpen(var inbound, let outbound):
  263. if inbound.requestBuffer == nil {
  264. // We're not dealing with gRPC Web Text: just forward the buffer.
  265. return .fireChannelRead(.data(.init(data: .byteBuffer(buffer))))
  266. }
  267. if inbound.requestBuffer!.readableBytes == 0 {
  268. inbound.requestBuffer = buffer
  269. } else {
  270. inbound.requestBuffer!.writeBuffer(&buffer)
  271. }
  272. let readableBytes = inbound.requestBuffer!.readableBytes
  273. // The length of base64 encoded data must be a multiple of 4.
  274. let bytesToRead = readableBytes - (readableBytes % 4)
  275. let action: GRPCWebToHTTP2ServerCodec.StateMachine.Action
  276. if bytesToRead > 0,
  277. let base64Encoded = inbound.requestBuffer!.readString(length: bytesToRead),
  278. let base64Decoded = Data(base64Encoded: base64Encoded) {
  279. // Recycle the input buffer and restore the request buffer.
  280. buffer.clear()
  281. buffer.writeContiguousBytes(base64Decoded)
  282. action = .fireChannelRead(.data(.init(data: .byteBuffer(buffer))))
  283. } else {
  284. action = .none
  285. }
  286. self = .fullyOpen(inbound, outbound)
  287. return action
  288. case .clientOpenServerClosed:
  289. // The server is already done; so drop the request.
  290. return .none
  291. case .clientClosedServerOpen:
  292. preconditionFailure("End of request stream already received")
  293. case ._modifying:
  294. preconditionFailure("Left in modifying state")
  295. }
  296. }
  297. private mutating func processRequestEnd(
  298. allocator: ByteBufferAllocator
  299. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  300. switch self {
  301. case .idle:
  302. preconditionFailure("Invalid state: haven't received request head")
  303. case let .fullyOpen(_, outbound):
  304. // We're done with inbound state.
  305. self = .clientClosedServerOpen(outbound)
  306. // Send an empty DATA frame with the end stream flag set.
  307. let empty = allocator.buffer(capacity: 0)
  308. return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true)))
  309. case .clientClosedServerOpen:
  310. preconditionFailure("End of request stream already received")
  311. case .clientOpenServerClosed:
  312. // Both sides are closed now, back to idle.
  313. self = .idle
  314. return .none
  315. case ._modifying:
  316. preconditionFailure("Left in modifying state")
  317. }
  318. }
  319. }
  320. // MARK: - Outbound
  321. extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
  322. private mutating func processResponseTrailers(
  323. _ trailers: HPACKHeaders,
  324. promise: EventLoopPromise<Void>?,
  325. allocator: ByteBufferAllocator
  326. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  327. switch self {
  328. case .idle:
  329. preconditionFailure("Invalid state: haven't received request head")
  330. case var .fullyOpen(_, outbound):
  331. // Double check these are trailers.
  332. assert(outbound.responseHeadersSent)
  333. // We haven't seen the end of the request stream yet.
  334. self = .clientOpenServerClosed
  335. // Avoid CoW-ing the buffers.
  336. let responseBuffers = outbound.responseBuffer
  337. outbound.responseBuffer = nil
  338. return self.processTrailers(
  339. responseBuffers: responseBuffers,
  340. trailers: trailers,
  341. promise: promise,
  342. allocator: allocator,
  343. closeChannel: outbound.closeConnection
  344. )
  345. case var .clientClosedServerOpen(state):
  346. // Client is closed and now so is the server.
  347. self = .idle
  348. // Avoid CoW-ing the buffers.
  349. let responseBuffers = state.responseBuffer
  350. state.responseBuffer = nil
  351. return self.processTrailers(
  352. responseBuffers: responseBuffers,
  353. trailers: trailers,
  354. promise: promise,
  355. allocator: allocator,
  356. closeChannel: state.closeConnection
  357. )
  358. case .clientOpenServerClosed:
  359. preconditionFailure("Already seen end of response stream")
  360. case ._modifying:
  361. preconditionFailure("Left in modifying state")
  362. }
  363. }
  364. private func processTrailers(
  365. responseBuffers: CircularBuffer<ByteBuffer>?,
  366. trailers: HPACKHeaders,
  367. promise: EventLoopPromise<Void>?,
  368. allocator: ByteBufferAllocator,
  369. closeChannel: Bool
  370. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  371. if var responseBuffers = responseBuffers {
  372. let buffer = GRPCWebToHTTP2ServerCodec.encodeResponsesAndTrailers(
  373. &responseBuffers,
  374. trailers: trailers,
  375. allocator: allocator
  376. )
  377. return .write(
  378. .init(
  379. part: .body(.byteBuffer(buffer)),
  380. additionalPart: .end(nil),
  381. promise: promise,
  382. closeChannel: closeChannel
  383. )
  384. )
  385. } else {
  386. // No response buffer; plain gRPC Web.
  387. let trailers = HTTPHeaders(hpackHeaders: trailers)
  388. return .write(.init(part: .end(trailers), promise: promise, closeChannel: closeChannel))
  389. }
  390. }
  391. private mutating func processResponseTrailersOnly(
  392. _ trailers: HPACKHeaders,
  393. promise: EventLoopPromise<Void>?
  394. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  395. switch self {
  396. case .idle:
  397. preconditionFailure("Invalid state: haven't received request head")
  398. case let .fullyOpen(_, outbound):
  399. // We still haven't seen the end of the request stream.
  400. self = .clientOpenServerClosed
  401. let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
  402. hpackHeaders: trailers,
  403. closeConnection: outbound.closeConnection
  404. )
  405. return .write(
  406. .init(
  407. part: .head(head),
  408. additionalPart: .end(nil),
  409. promise: promise,
  410. closeChannel: outbound.closeConnection
  411. )
  412. )
  413. case let .clientClosedServerOpen(outbound):
  414. // We're done, back to idle.
  415. self = .idle
  416. let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
  417. hpackHeaders: trailers,
  418. closeConnection: outbound.closeConnection
  419. )
  420. return .write(
  421. .init(
  422. part: .head(head),
  423. additionalPart: .end(nil),
  424. promise: promise,
  425. closeChannel: outbound.closeConnection
  426. )
  427. )
  428. case .clientOpenServerClosed:
  429. preconditionFailure("Already seen end of response stream")
  430. case ._modifying:
  431. preconditionFailure("Left in modifying state")
  432. }
  433. }
  434. private mutating func processResponseHeaders(
  435. _ headers: HPACKHeaders,
  436. promise: EventLoopPromise<Void>?
  437. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  438. switch self {
  439. case .idle:
  440. preconditionFailure("Invalid state: haven't received request head")
  441. case .fullyOpen(let inbound, var outbound):
  442. outbound.responseHeadersSent = true
  443. self = .fullyOpen(inbound, outbound)
  444. let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
  445. hpackHeaders: headers,
  446. closeConnection: outbound.closeConnection
  447. )
  448. return .write(.init(part: .head(head), promise: promise, closeChannel: false))
  449. case var .clientClosedServerOpen(outbound):
  450. outbound.responseHeadersSent = true
  451. self = .clientClosedServerOpen(outbound)
  452. let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
  453. hpackHeaders: headers,
  454. closeConnection: outbound.closeConnection
  455. )
  456. return .write(.init(part: .head(head), promise: promise, closeChannel: false))
  457. case .clientOpenServerClosed:
  458. preconditionFailure("Already seen end of response stream")
  459. case ._modifying:
  460. preconditionFailure("Left in modifying state")
  461. }
  462. }
  463. private mutating func processResponseHeaders(
  464. _ payload: HTTP2Frame.FramePayload.Headers,
  465. promise: EventLoopPromise<Void>?,
  466. allocator: ByteBufferAllocator
  467. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  468. switch self {
  469. case .idle:
  470. preconditionFailure("Invalid state: haven't received request head")
  471. case let .fullyOpen(_, outbound),
  472. let .clientClosedServerOpen(outbound):
  473. if outbound.responseHeadersSent {
  474. // Headers have been sent, these must be trailers, so end stream must be set.
  475. assert(payload.endStream)
  476. return self.processResponseTrailers(payload.headers, promise: promise, allocator: allocator)
  477. } else if payload.endStream {
  478. // Headers haven't been sent yet and end stream is set: this is a trailers only response
  479. // so we need to send 'end' as well.
  480. return self.processResponseTrailersOnly(payload.headers, promise: promise)
  481. } else {
  482. return self.processResponseHeaders(payload.headers, promise: promise)
  483. }
  484. case .clientOpenServerClosed:
  485. // We've already sent end.
  486. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  487. case ._modifying:
  488. preconditionFailure("Left in modifying state")
  489. }
  490. }
  491. private func processResponseData(
  492. _ payload: HTTP2Frame.FramePayload.Data,
  493. promise: EventLoopPromise<Void>?,
  494. state: inout GRPCWebToHTTP2ServerCodec.StateMachine.OutboundState
  495. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  496. if state.responseBuffer == nil {
  497. // Not gRPC Web Text; just write the body.
  498. return .write(.init(part: .body(payload.data), promise: promise, closeChannel: false))
  499. } else {
  500. switch payload.data {
  501. case let .byteBuffer(buffer):
  502. // '!' is fine, we checked above.
  503. state.responseBuffer!.append(buffer)
  504. case .fileRegion:
  505. preconditionFailure("Unexpected IOData.fileRegion")
  506. }
  507. // The response is buffered, we can consider it dealt with.
  508. return .completePromise(promise, .success(()))
  509. }
  510. }
  511. private mutating func processResponseData(
  512. _ payload: HTTP2Frame.FramePayload.Data,
  513. promise: EventLoopPromise<Void>?
  514. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  515. switch self {
  516. case .idle:
  517. preconditionFailure("Invalid state: haven't received request head")
  518. case .fullyOpen(let inbound, var outbound):
  519. let action = self.processResponseData(payload, promise: promise, state: &outbound)
  520. self = .fullyOpen(inbound, outbound)
  521. return action
  522. case var .clientClosedServerOpen(outbound):
  523. let action = self.processResponseData(payload, promise: promise, state: &outbound)
  524. self = .clientClosedServerOpen(outbound)
  525. return action
  526. case .clientOpenServerClosed:
  527. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  528. case ._modifying:
  529. preconditionFailure("Left in modifying state")
  530. }
  531. }
  532. }
  533. // MARK: - Helpers
  534. extension GRPCWebToHTTP2ServerCodec {
  535. private static func makeResponseHead(
  536. hpackHeaders: HPACKHeaders,
  537. closeConnection: Bool
  538. ) -> HTTPResponseHead {
  539. var headers = HTTPHeaders(hpackHeaders: hpackHeaders)
  540. if closeConnection {
  541. headers.add(name: "connection", value: "close")
  542. }
  543. // Grab the status, if this is missing we've messed up in another handler.
  544. guard let statusCode = hpackHeaders.first(name: ":status").flatMap(Int.init) else {
  545. preconditionFailure("Invalid state: missing ':status' pseudo header")
  546. }
  547. return HTTPResponseHead(
  548. version: .init(major: 1, minor: 1),
  549. status: .init(statusCode: statusCode),
  550. headers: headers
  551. )
  552. }
  553. private static func formatTrailers(
  554. _ trailers: HPACKHeaders,
  555. allocator: ByteBufferAllocator
  556. ) -> ByteBuffer {
  557. // See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md
  558. let encodedTrailers = trailers.map { name, value, _ in
  559. "\(name): \(value)"
  560. }.joined(separator: "\r\n")
  561. var buffer = allocator.buffer(capacity: 5 + encodedTrailers.utf8.count)
  562. // Uncompressed trailer byte.
  563. buffer.writeInteger(UInt8(0x80))
  564. // Length.
  565. buffer.writeInteger(UInt32(encodedTrailers.utf8.count))
  566. // Uncompressed trailers.
  567. buffer.writeString(encodedTrailers)
  568. return buffer
  569. }
  570. private static func encodeResponsesAndTrailers(
  571. _ responses: inout CircularBuffer<ByteBuffer>,
  572. trailers: HPACKHeaders,
  573. allocator: ByteBufferAllocator
  574. ) -> ByteBuffer {
  575. // We need to encode the trailers along with any responses we're holding.
  576. responses.append(self.formatTrailers(trailers, allocator: allocator))
  577. let capacity = responses.lazy.map { $0.readableBytes }.reduce(0, +)
  578. // '!' is fine: responses isn't empty, we just appended the trailers.
  579. var buffer = responses.popFirst()!
  580. // Accumulate all the buffers into a single 'Data'. Ideally we wouldn't copy back and forth
  581. // but this is fine for now.
  582. var accumulatedData = buffer.readData(length: buffer.readableBytes)!
  583. accumulatedData.reserveCapacity(capacity)
  584. while let buffer = responses.popFirst() {
  585. accumulatedData.append(contentsOf: buffer.readableBytesView)
  586. }
  587. // We can reuse the popped buffer.
  588. let base64Encoded = accumulatedData.base64EncodedString()
  589. buffer.clear(minimumCapacity: base64Encoded.utf8.count)
  590. buffer.writeString(base64Encoded)
  591. return buffer
  592. }
  593. }
  594. extension HTTPHeaders {
  595. fileprivate init(hpackHeaders headers: HPACKHeaders) {
  596. self.init()
  597. self.reserveCapacity(headers.count)
  598. // Pseudo-headers are at the start of the block, so drop them and then add the remaining.
  599. let regularHeaders = headers.drop { name, _, _ in
  600. name.utf8.first == .some(UInt8(ascii: ":"))
  601. }.lazy.map { name, value, _ in
  602. (name, value)
  603. }
  604. self.add(contentsOf: regularHeaders)
  605. }
  606. }