GRPCWebToHTTP2ServerCodec.swift 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import struct Foundation.Data
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP1
  20. import NIOHTTP2
  21. /// A codec for translating between gRPC Web (as HTTP/1) and HTTP/2 frame payloads.
  22. internal final class GRPCWebToHTTP2ServerCodec: ChannelDuplexHandler {
  23. internal typealias InboundIn = HTTPServerRequestPart
  24. internal typealias InboundOut = HTTP2Frame.FramePayload
  25. internal typealias OutboundIn = HTTP2Frame.FramePayload
  26. internal typealias OutboundOut = HTTPServerResponsePart
  27. private var stateMachine: StateMachine
  28. /// Create a gRPC Web to server HTTP/2 codec.
  29. ///
  30. /// - Parameter scheme: The value of the ':scheme' pseudo header to insert when converting the
  31. /// request headers.
  32. init(scheme: String) {
  33. self.stateMachine = StateMachine(scheme: scheme)
  34. }
  35. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  36. let action = self.stateMachine.processInbound(
  37. serverRequestPart: self.unwrapInboundIn(data),
  38. allocator: context.channel.allocator
  39. )
  40. self.act(on: action, context: context)
  41. }
  42. internal func write(
  43. context: ChannelHandlerContext,
  44. data: NIOAny,
  45. promise: EventLoopPromise<Void>?
  46. ) {
  47. let action = self.stateMachine.processOutbound(
  48. framePayload: self.unwrapOutboundIn(data),
  49. promise: promise,
  50. allocator: context.channel.allocator
  51. )
  52. self.act(on: action, context: context)
  53. }
  54. /// Acts on an action returned by the state machine.
  55. private func act(on action: StateMachine.Action, context: ChannelHandlerContext) {
  56. switch action {
  57. case .none:
  58. ()
  59. case let .fireChannelRead(payload):
  60. context.fireChannelRead(self.wrapInboundOut(payload))
  61. case let .write(write):
  62. if let additionalPart = write.additionalPart {
  63. context.write(self.wrapOutboundOut(write.part), promise: nil)
  64. context.write(self.wrapOutboundOut(additionalPart), promise: write.promise)
  65. } else {
  66. context.write(self.wrapOutboundOut(write.part), promise: write.promise)
  67. }
  68. if write.closeChannel {
  69. context.close(mode: .all, promise: nil)
  70. }
  71. case let .completePromise(promise, result):
  72. promise?.completeWith(result)
  73. }
  74. }
  75. }
  76. extension GRPCWebToHTTP2ServerCodec {
  77. internal struct StateMachine {
  78. /// The current state.
  79. private var state: State
  80. private let scheme: String
  81. internal init(scheme: String) {
  82. self.state = .idle
  83. self.scheme = scheme
  84. }
  85. private mutating func withStateAvoidingCoWs(_ body: (inout State) -> Action) -> Action {
  86. var state: State = ._modifying
  87. swap(&self.state, &state)
  88. defer {
  89. swap(&self.state, &state)
  90. }
  91. return body(&state)
  92. }
  93. /// Process the inbound `HTTPServerRequestPart`.
  94. internal mutating func processInbound(
  95. serverRequestPart: HTTPServerRequestPart,
  96. allocator: ByteBufferAllocator
  97. ) -> Action {
  98. let scheme = self.scheme
  99. return self.withStateAvoidingCoWs { state in
  100. state.processInbound(
  101. serverRequestPart: serverRequestPart,
  102. scheme: scheme,
  103. allocator: allocator
  104. )
  105. }
  106. }
  107. /// Process the outbound `HTTP2Frame.FramePayload`.
  108. internal mutating func processOutbound(
  109. framePayload: HTTP2Frame.FramePayload,
  110. promise: EventLoopPromise<Void>?,
  111. allocator: ByteBufferAllocator
  112. ) -> Action {
  113. return self.withStateAvoidingCoWs { state in
  114. state.processOutbound(framePayload: framePayload, promise: promise, allocator: allocator)
  115. }
  116. }
  117. /// An action to take as a result of interaction with the state machine.
  118. internal enum Action {
  119. case none
  120. case fireChannelRead(HTTP2Frame.FramePayload)
  121. case write(Write)
  122. case completePromise(EventLoopPromise<Void>?, Result<Void, Error>)
  123. internal struct Write {
  124. internal var part: HTTPServerResponsePart
  125. internal var additionalPart: HTTPServerResponsePart?
  126. internal var promise: EventLoopPromise<Void>?
  127. internal var closeChannel: Bool
  128. internal init(
  129. part: HTTPServerResponsePart,
  130. additionalPart: HTTPServerResponsePart? = nil,
  131. promise: EventLoopPromise<Void>?,
  132. closeChannel: Bool
  133. ) {
  134. self.part = part
  135. self.additionalPart = additionalPart
  136. self.promise = promise
  137. self.closeChannel = closeChannel
  138. }
  139. }
  140. }
  141. fileprivate enum State {
  142. /// Idle; nothing has been received or sent. The only valid transition is to 'fullyOpen' when
  143. /// receiving request headers.
  144. case idle
  145. /// Received request headers. Waiting for the end of request and response streams.
  146. case fullyOpen(InboundState, OutboundState)
  147. /// The server has closed the response stream, we may receive other request parts from the client.
  148. case clientOpenServerClosed(InboundState)
  149. /// The client has sent everything, the server still needs to close the response stream.
  150. case clientClosedServerOpen(OutboundState)
  151. /// Not a real state.
  152. case _modifying
  153. }
  154. fileprivate struct InboundState {
  155. /// A `ByteBuffer` containing the base64 encoded bytes of the request stream if gRPC Web Text
  156. /// is being used, `nil` otherwise.
  157. var requestBuffer: ByteBuffer?
  158. init(isTextEncoded: Bool, allocator: ByteBufferAllocator) {
  159. self.requestBuffer = isTextEncoded ? allocator.buffer(capacity: 0) : nil
  160. }
  161. }
  162. fileprivate struct OutboundState {
  163. /// A `CircularBuffer` holding any response messages if gRPC Web Text is being used, `nil`
  164. /// otherwise.
  165. var responseBuffer: CircularBuffer<ByteBuffer>?
  166. /// True if the response headers have been sent.
  167. var responseHeadersSent: Bool
  168. /// True if the server should close the connection when this request is done.
  169. var closeConnection: Bool
  170. init(isTextEncoded: Bool, closeConnection: Bool) {
  171. self.responseHeadersSent = false
  172. self.responseBuffer = isTextEncoded ? CircularBuffer() : nil
  173. self.closeConnection = closeConnection
  174. }
  175. }
  176. }
  177. }
  178. extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
  179. fileprivate mutating func processInbound(
  180. serverRequestPart: HTTPServerRequestPart,
  181. scheme: String,
  182. allocator: ByteBufferAllocator
  183. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  184. switch serverRequestPart {
  185. case let .head(head):
  186. return self.processRequestHead(head, scheme: scheme, allocator: allocator)
  187. case var .body(buffer):
  188. return self.processRequestBody(&buffer)
  189. case .end:
  190. return self.processRequestEnd(allocator: allocator)
  191. }
  192. }
  193. fileprivate mutating func processOutbound(
  194. framePayload: HTTP2Frame.FramePayload,
  195. promise: EventLoopPromise<Void>?,
  196. allocator: ByteBufferAllocator
  197. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  198. switch framePayload {
  199. case let .headers(payload):
  200. return self.processResponseHeaders(payload, promise: promise, allocator: allocator)
  201. case let .data(payload):
  202. return self.processResponseData(payload, promise: promise)
  203. case .priority,
  204. .rstStream,
  205. .settings,
  206. .pushPromise,
  207. .ping,
  208. .goAway,
  209. .windowUpdate,
  210. .alternativeService,
  211. .origin:
  212. preconditionFailure("Unsupported frame payload")
  213. }
  214. }
  215. }
  216. // MARK: - Inbound
  217. extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
  218. private mutating func processRequestHead(
  219. _ head: HTTPRequestHead,
  220. scheme: String,
  221. allocator: ByteBufferAllocator
  222. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  223. switch self {
  224. case .idle:
  225. let normalized = HPACKHeaders(httpHeaders: head.headers, normalizeHTTPHeaders: true)
  226. // Regular headers need to come after the pseudo headers. Unfortunately, this means we need to
  227. // allocate a second headers block to use the normalization provided by NIO HTTP/2.
  228. //
  229. // TODO: Use API provided by https://github.com/apple/swift-nio-http2/issues/254 to avoid the
  230. // extra copy.
  231. var headers = HPACKHeaders()
  232. headers.reserveCapacity(normalized.count + 4)
  233. headers.add(name: ":path", value: head.uri)
  234. headers.add(name: ":method", value: head.method.rawValue)
  235. headers.add(name: ":scheme", value: scheme)
  236. if let host = head.headers.first(name: "host") {
  237. headers.add(name: ":authority", value: host)
  238. }
  239. headers.add(contentsOf: normalized)
  240. // Check whether we're dealing with gRPC Web Text. No need to fully validate the content-type
  241. // that will be done at the HTTP/2 level.
  242. let contentType = headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init)
  243. let isWebText = contentType == .some(.webTextProtobuf)
  244. let closeConnection = head.headers[canonicalForm: "connection"].contains("close")
  245. self = .fullyOpen(
  246. .init(isTextEncoded: isWebText, allocator: allocator),
  247. .init(isTextEncoded: isWebText, closeConnection: closeConnection)
  248. )
  249. return .fireChannelRead(.headers(.init(headers: headers)))
  250. case .fullyOpen, .clientOpenServerClosed, .clientClosedServerOpen:
  251. preconditionFailure("Invalid state: already received request head")
  252. case ._modifying:
  253. preconditionFailure("Left in modifying state")
  254. }
  255. }
  256. private mutating func processRequestBody(
  257. _ buffer: inout ByteBuffer
  258. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  259. switch self {
  260. case .idle:
  261. preconditionFailure("Invalid state: haven't received request head")
  262. case .fullyOpen(var inbound, let outbound):
  263. let action = inbound.processInboundData(buffer: &buffer)
  264. self = .fullyOpen(inbound, outbound)
  265. return action
  266. case var .clientOpenServerClosed(inbound):
  267. // The server is already done, but it's not our place to drop the request.
  268. let action = inbound.processInboundData(buffer: &buffer)
  269. self = .clientOpenServerClosed(inbound)
  270. return action
  271. case .clientClosedServerOpen:
  272. preconditionFailure("End of request stream already received")
  273. case ._modifying:
  274. preconditionFailure("Left in modifying state")
  275. }
  276. }
  277. private mutating func processRequestEnd(
  278. allocator: ByteBufferAllocator
  279. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  280. switch self {
  281. case .idle:
  282. preconditionFailure("Invalid state: haven't received request head")
  283. case let .fullyOpen(_, outbound):
  284. // We're done with inbound state.
  285. self = .clientClosedServerOpen(outbound)
  286. // Send an empty DATA frame with the end stream flag set.
  287. let empty = allocator.buffer(capacity: 0)
  288. return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true)))
  289. case .clientClosedServerOpen:
  290. preconditionFailure("End of request stream already received")
  291. case .clientOpenServerClosed:
  292. // Both sides are closed now, back to idle. Don't forget to pass on the .end, as
  293. // it's necessary to communicate to the other peers that the response is done.
  294. self = .idle
  295. // Send an empty DATA frame with the end stream flag set.
  296. let empty = allocator.buffer(capacity: 0)
  297. return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true)))
  298. case ._modifying:
  299. preconditionFailure("Left in modifying state")
  300. }
  301. }
  302. }
  303. // MARK: - Outbound
  304. extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
  305. private mutating func processResponseTrailers(
  306. _ trailers: HPACKHeaders,
  307. promise: EventLoopPromise<Void>?,
  308. allocator: ByteBufferAllocator
  309. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  310. switch self {
  311. case .idle:
  312. preconditionFailure("Invalid state: haven't received request head")
  313. case .fullyOpen(let inbound, var outbound):
  314. // Double check these are trailers.
  315. assert(outbound.responseHeadersSent)
  316. // We haven't seen the end of the request stream yet.
  317. self = .clientOpenServerClosed(inbound)
  318. // Avoid CoW-ing the buffers.
  319. let responseBuffers = outbound.responseBuffer
  320. outbound.responseBuffer = nil
  321. return self.processTrailers(
  322. responseBuffers: responseBuffers,
  323. trailers: trailers,
  324. promise: promise,
  325. allocator: allocator,
  326. closeChannel: outbound.closeConnection
  327. )
  328. case var .clientClosedServerOpen(state):
  329. // Client is closed and now so is the server.
  330. self = .idle
  331. // Avoid CoW-ing the buffers.
  332. let responseBuffers = state.responseBuffer
  333. state.responseBuffer = nil
  334. return self.processTrailers(
  335. responseBuffers: responseBuffers,
  336. trailers: trailers,
  337. promise: promise,
  338. allocator: allocator,
  339. closeChannel: state.closeConnection
  340. )
  341. case .clientOpenServerClosed:
  342. preconditionFailure("Already seen end of response stream")
  343. case ._modifying:
  344. preconditionFailure("Left in modifying state")
  345. }
  346. }
  347. private func processTrailers(
  348. responseBuffers: CircularBuffer<ByteBuffer>?,
  349. trailers: HPACKHeaders,
  350. promise: EventLoopPromise<Void>?,
  351. allocator: ByteBufferAllocator,
  352. closeChannel: Bool
  353. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  354. if var responseBuffers = responseBuffers {
  355. let buffer = GRPCWebToHTTP2ServerCodec.encodeResponsesAndTrailers(
  356. &responseBuffers,
  357. trailers: trailers,
  358. allocator: allocator
  359. )
  360. return .write(
  361. .init(
  362. part: .body(.byteBuffer(buffer)),
  363. additionalPart: .end(nil),
  364. promise: promise,
  365. closeChannel: closeChannel
  366. )
  367. )
  368. } else {
  369. // No response buffer; plain gRPC Web.
  370. let trailers = HTTPHeaders(hpackHeaders: trailers)
  371. return .write(.init(part: .end(trailers), promise: promise, closeChannel: closeChannel))
  372. }
  373. }
  374. private mutating func processResponseTrailersOnly(
  375. _ trailers: HPACKHeaders,
  376. promise: EventLoopPromise<Void>?
  377. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  378. switch self {
  379. case .idle:
  380. preconditionFailure("Invalid state: haven't received request head")
  381. case let .fullyOpen(inbound, outbound):
  382. // We still haven't seen the end of the request stream.
  383. self = .clientOpenServerClosed(inbound)
  384. let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
  385. hpackHeaders: trailers,
  386. closeConnection: outbound.closeConnection
  387. )
  388. return .write(
  389. .init(
  390. part: .head(head),
  391. additionalPart: .end(nil),
  392. promise: promise,
  393. closeChannel: outbound.closeConnection
  394. )
  395. )
  396. case let .clientClosedServerOpen(outbound):
  397. // We're done, back to idle.
  398. self = .idle
  399. let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
  400. hpackHeaders: trailers,
  401. closeConnection: outbound.closeConnection
  402. )
  403. return .write(
  404. .init(
  405. part: .head(head),
  406. additionalPart: .end(nil),
  407. promise: promise,
  408. closeChannel: outbound.closeConnection
  409. )
  410. )
  411. case .clientOpenServerClosed:
  412. preconditionFailure("Already seen end of response stream")
  413. case ._modifying:
  414. preconditionFailure("Left in modifying state")
  415. }
  416. }
  417. private mutating func processResponseHeaders(
  418. _ headers: HPACKHeaders,
  419. promise: EventLoopPromise<Void>?
  420. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  421. switch self {
  422. case .idle:
  423. preconditionFailure("Invalid state: haven't received request head")
  424. case .fullyOpen(let inbound, var outbound):
  425. outbound.responseHeadersSent = true
  426. self = .fullyOpen(inbound, outbound)
  427. let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
  428. hpackHeaders: headers,
  429. closeConnection: outbound.closeConnection
  430. )
  431. return .write(.init(part: .head(head), promise: promise, closeChannel: false))
  432. case var .clientClosedServerOpen(outbound):
  433. outbound.responseHeadersSent = true
  434. self = .clientClosedServerOpen(outbound)
  435. let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
  436. hpackHeaders: headers,
  437. closeConnection: outbound.closeConnection
  438. )
  439. return .write(.init(part: .head(head), promise: promise, closeChannel: false))
  440. case .clientOpenServerClosed:
  441. preconditionFailure("Already seen end of response stream")
  442. case ._modifying:
  443. preconditionFailure("Left in modifying state")
  444. }
  445. }
  446. private mutating func processResponseHeaders(
  447. _ payload: HTTP2Frame.FramePayload.Headers,
  448. promise: EventLoopPromise<Void>?,
  449. allocator: ByteBufferAllocator
  450. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  451. switch self {
  452. case .idle:
  453. preconditionFailure("Invalid state: haven't received request head")
  454. case let .fullyOpen(_, outbound),
  455. let .clientClosedServerOpen(outbound):
  456. if outbound.responseHeadersSent {
  457. // Headers have been sent, these must be trailers, so end stream must be set.
  458. assert(payload.endStream)
  459. return self.processResponseTrailers(payload.headers, promise: promise, allocator: allocator)
  460. } else if payload.endStream {
  461. // Headers haven't been sent yet and end stream is set: this is a trailers only response
  462. // so we need to send 'end' as well.
  463. return self.processResponseTrailersOnly(payload.headers, promise: promise)
  464. } else {
  465. return self.processResponseHeaders(payload.headers, promise: promise)
  466. }
  467. case .clientOpenServerClosed:
  468. // We've already sent end.
  469. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  470. case ._modifying:
  471. preconditionFailure("Left in modifying state")
  472. }
  473. }
  474. private func processResponseData(
  475. _ payload: HTTP2Frame.FramePayload.Data,
  476. promise: EventLoopPromise<Void>?,
  477. state: inout GRPCWebToHTTP2ServerCodec.StateMachine.OutboundState
  478. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  479. if state.responseBuffer == nil {
  480. // Not gRPC Web Text; just write the body.
  481. return .write(.init(part: .body(payload.data), promise: promise, closeChannel: false))
  482. } else {
  483. switch payload.data {
  484. case let .byteBuffer(buffer):
  485. // '!' is fine, we checked above.
  486. state.responseBuffer!.append(buffer)
  487. case .fileRegion:
  488. preconditionFailure("Unexpected IOData.fileRegion")
  489. }
  490. // The response is buffered, we can consider it dealt with.
  491. return .completePromise(promise, .success(()))
  492. }
  493. }
  494. private mutating func processResponseData(
  495. _ payload: HTTP2Frame.FramePayload.Data,
  496. promise: EventLoopPromise<Void>?
  497. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  498. switch self {
  499. case .idle:
  500. preconditionFailure("Invalid state: haven't received request head")
  501. case .fullyOpen(let inbound, var outbound):
  502. let action = self.processResponseData(payload, promise: promise, state: &outbound)
  503. self = .fullyOpen(inbound, outbound)
  504. return action
  505. case var .clientClosedServerOpen(outbound):
  506. let action = self.processResponseData(payload, promise: promise, state: &outbound)
  507. self = .clientClosedServerOpen(outbound)
  508. return action
  509. case .clientOpenServerClosed:
  510. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  511. case ._modifying:
  512. preconditionFailure("Left in modifying state")
  513. }
  514. }
  515. }
  516. // MARK: - Helpers
  517. extension GRPCWebToHTTP2ServerCodec {
  518. private static func makeResponseHead(
  519. hpackHeaders: HPACKHeaders,
  520. closeConnection: Bool
  521. ) -> HTTPResponseHead {
  522. var headers = HTTPHeaders(hpackHeaders: hpackHeaders)
  523. if closeConnection {
  524. headers.add(name: "connection", value: "close")
  525. }
  526. // Grab the status, if this is missing we've messed up in another handler.
  527. guard let statusCode = hpackHeaders.first(name: ":status").flatMap(Int.init) else {
  528. preconditionFailure("Invalid state: missing ':status' pseudo header")
  529. }
  530. return HTTPResponseHead(
  531. version: .init(major: 1, minor: 1),
  532. status: .init(statusCode: statusCode),
  533. headers: headers
  534. )
  535. }
  536. private static func formatTrailers(
  537. _ trailers: HPACKHeaders,
  538. allocator: ByteBufferAllocator
  539. ) -> ByteBuffer {
  540. // See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md
  541. let encodedTrailers = trailers.map { name, value, _ in
  542. "\(name): \(value)"
  543. }.joined(separator: "\r\n")
  544. var buffer = allocator.buffer(capacity: 5 + encodedTrailers.utf8.count)
  545. // Uncompressed trailer byte.
  546. buffer.writeInteger(UInt8(0x80))
  547. // Length.
  548. buffer.writeInteger(UInt32(encodedTrailers.utf8.count))
  549. // Uncompressed trailers.
  550. buffer.writeString(encodedTrailers)
  551. return buffer
  552. }
  553. private static func encodeResponsesAndTrailers(
  554. _ responses: inout CircularBuffer<ByteBuffer>,
  555. trailers: HPACKHeaders,
  556. allocator: ByteBufferAllocator
  557. ) -> ByteBuffer {
  558. // We need to encode the trailers along with any responses we're holding.
  559. responses.append(self.formatTrailers(trailers, allocator: allocator))
  560. let capacity = responses.lazy.map { $0.readableBytes }.reduce(0, +)
  561. // '!' is fine: responses isn't empty, we just appended the trailers.
  562. var buffer = responses.popFirst()!
  563. // Accumulate all the buffers into a single 'Data'. Ideally we wouldn't copy back and forth
  564. // but this is fine for now.
  565. var accumulatedData = buffer.readData(length: buffer.readableBytes)!
  566. accumulatedData.reserveCapacity(capacity)
  567. while let buffer = responses.popFirst() {
  568. accumulatedData.append(contentsOf: buffer.readableBytesView)
  569. }
  570. // We can reuse the popped buffer.
  571. let base64Encoded = accumulatedData.base64EncodedString()
  572. buffer.clear(minimumCapacity: base64Encoded.utf8.count)
  573. buffer.writeString(base64Encoded)
  574. return buffer
  575. }
  576. }
  577. extension GRPCWebToHTTP2ServerCodec.StateMachine.InboundState {
  578. fileprivate mutating func processInboundData(
  579. buffer: inout ByteBuffer
  580. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  581. if self.requestBuffer == nil {
  582. // We're not dealing with gRPC Web Text: just forward the buffer.
  583. return .fireChannelRead(.data(.init(data: .byteBuffer(buffer))))
  584. }
  585. if self.requestBuffer!.readableBytes == 0 {
  586. self.requestBuffer = buffer
  587. } else {
  588. self.requestBuffer!.writeBuffer(&buffer)
  589. }
  590. let readableBytes = self.requestBuffer!.readableBytes
  591. // The length of base64 encoded data must be a multiple of 4.
  592. let bytesToRead = readableBytes - (readableBytes % 4)
  593. let action: GRPCWebToHTTP2ServerCodec.StateMachine.Action
  594. if bytesToRead > 0,
  595. let base64Encoded = self.requestBuffer!.readString(length: bytesToRead),
  596. let base64Decoded = Data(base64Encoded: base64Encoded) {
  597. // Recycle the input buffer and restore the request buffer.
  598. buffer.clear()
  599. buffer.writeContiguousBytes(base64Decoded)
  600. action = .fireChannelRead(.data(.init(data: .byteBuffer(buffer))))
  601. } else {
  602. action = .none
  603. }
  604. return action
  605. }
  606. }
  607. extension HTTPHeaders {
  608. fileprivate init(hpackHeaders headers: HPACKHeaders) {
  609. self.init()
  610. self.reserveCapacity(headers.count)
  611. // Pseudo-headers are at the start of the block, so drop them and then add the remaining.
  612. let regularHeaders = headers.drop { name, _, _ in
  613. name.utf8.first == .some(UInt8(ascii: ":"))
  614. }.lazy.map { name, value, _ in
  615. (name, value)
  616. }
  617. self.add(contentsOf: regularHeaders)
  618. }
  619. }