GRPCWebToHTTP2ServerCodec.swift 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import struct Foundation.Data
  17. import NIOCore
  18. import NIOHPACK
  19. import NIOHTTP1
  20. import NIOHTTP2
  21. /// A codec for translating between gRPC Web (as HTTP/1) and HTTP/2 frame payloads.
  22. internal final class GRPCWebToHTTP2ServerCodec: ChannelDuplexHandler {
  23. internal typealias InboundIn = HTTPServerRequestPart
  24. internal typealias InboundOut = HTTP2Frame.FramePayload
  25. internal typealias OutboundIn = HTTP2Frame.FramePayload
  26. internal typealias OutboundOut = HTTPServerResponsePart
  27. private var stateMachine: StateMachine
  28. /// Create a gRPC Web to server HTTP/2 codec.
  29. ///
  30. /// - Parameter scheme: The value of the ':scheme' pseudo header to insert when converting the
  31. /// request headers.
  32. init(scheme: String) {
  33. self.stateMachine = StateMachine(scheme: scheme)
  34. }
  35. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  36. let action = self.stateMachine.processInbound(
  37. serverRequestPart: self.unwrapInboundIn(data),
  38. allocator: context.channel.allocator
  39. )
  40. self.act(on: action, context: context)
  41. }
  42. internal func write(
  43. context: ChannelHandlerContext,
  44. data: NIOAny,
  45. promise: EventLoopPromise<Void>?
  46. ) {
  47. let action = self.stateMachine.processOutbound(
  48. framePayload: self.unwrapOutboundIn(data),
  49. promise: promise,
  50. allocator: context.channel.allocator
  51. )
  52. self.act(on: action, context: context)
  53. }
  54. /// Acts on an action returned by the state machine.
  55. private func act(on action: StateMachine.Action, context: ChannelHandlerContext) {
  56. switch action {
  57. case .none:
  58. ()
  59. case let .fireChannelRead(payload):
  60. context.fireChannelRead(self.wrapInboundOut(payload))
  61. case let .write(write):
  62. if let additionalPart = write.additionalPart {
  63. context.write(self.wrapOutboundOut(write.part), promise: nil)
  64. context.write(self.wrapOutboundOut(additionalPart), promise: write.promise)
  65. } else {
  66. context.write(self.wrapOutboundOut(write.part), promise: write.promise)
  67. }
  68. if write.closeChannel {
  69. context.close(mode: .all, promise: nil)
  70. }
  71. case let .completePromise(promise, result):
  72. promise?.completeWith(result)
  73. }
  74. }
  75. }
  76. extension GRPCWebToHTTP2ServerCodec {
  77. internal struct StateMachine {
  78. /// The current state.
  79. private var state: State
  80. private let scheme: String
  81. internal init(scheme: String) {
  82. self.state = .idle
  83. self.scheme = scheme
  84. }
  85. /// Process the inbound `HTTPServerRequestPart`.
  86. internal mutating func processInbound(
  87. serverRequestPart: HTTPServerRequestPart,
  88. allocator: ByteBufferAllocator
  89. ) -> Action {
  90. return self.state.processInbound(
  91. serverRequestPart: serverRequestPart,
  92. scheme: self.scheme,
  93. allocator: allocator
  94. )
  95. }
  96. /// Process the outbound `HTTP2Frame.FramePayload`.
  97. internal mutating func processOutbound(
  98. framePayload: HTTP2Frame.FramePayload,
  99. promise: EventLoopPromise<Void>?,
  100. allocator: ByteBufferAllocator
  101. ) -> Action {
  102. return self.state.processOutbound(
  103. framePayload: framePayload,
  104. promise: promise,
  105. allocator: allocator
  106. )
  107. }
  108. /// An action to take as a result of interaction with the state machine.
  109. internal enum Action {
  110. case none
  111. case fireChannelRead(HTTP2Frame.FramePayload)
  112. case write(Write)
  113. case completePromise(EventLoopPromise<Void>?, Result<Void, Error>)
  114. internal struct Write {
  115. internal var part: HTTPServerResponsePart
  116. internal var additionalPart: HTTPServerResponsePart?
  117. internal var promise: EventLoopPromise<Void>?
  118. internal var closeChannel: Bool
  119. internal init(
  120. part: HTTPServerResponsePart,
  121. additionalPart: HTTPServerResponsePart? = nil,
  122. promise: EventLoopPromise<Void>?,
  123. closeChannel: Bool
  124. ) {
  125. self.part = part
  126. self.additionalPart = additionalPart
  127. self.promise = promise
  128. self.closeChannel = closeChannel
  129. }
  130. }
  131. }
  132. fileprivate enum State {
  133. /// Idle; nothing has been received or sent. The only valid transition is to 'fullyOpen' when
  134. /// receiving request headers.
  135. case idle
  136. /// Received request headers. Waiting for the end of request and response streams.
  137. case fullyOpen(InboundState, OutboundState)
  138. /// The server has closed the response stream, we may receive other request parts from the client.
  139. case clientOpenServerClosed(InboundState)
  140. /// The client has sent everything, the server still needs to close the response stream.
  141. case clientClosedServerOpen(OutboundState)
  142. /// Not a real state.
  143. case _modifying
  144. private var isModifying: Bool {
  145. switch self {
  146. case ._modifying:
  147. return true
  148. case .idle, .fullyOpen, .clientClosedServerOpen, .clientOpenServerClosed:
  149. return false
  150. }
  151. }
  152. private mutating func withStateAvoidingCoWs(_ body: (inout State) -> Action) -> Action {
  153. self = ._modifying
  154. defer {
  155. assert(!self.isModifying)
  156. }
  157. return body(&self)
  158. }
  159. }
  160. fileprivate struct InboundState {
  161. /// A `ByteBuffer` containing the base64 encoded bytes of the request stream if gRPC Web Text
  162. /// is being used, `nil` otherwise.
  163. var requestBuffer: ByteBuffer?
  164. init(isTextEncoded: Bool, allocator: ByteBufferAllocator) {
  165. self.requestBuffer = isTextEncoded ? allocator.buffer(capacity: 0) : nil
  166. }
  167. }
  168. fileprivate struct OutboundState {
  169. /// A `CircularBuffer` holding any response messages if gRPC Web Text is being used, `nil`
  170. /// otherwise.
  171. var responseBuffer: CircularBuffer<ByteBuffer>?
  172. /// True if the response headers have been sent.
  173. var responseHeadersSent: Bool
  174. /// True if the server should close the connection when this request is done.
  175. var closeConnection: Bool
  176. init(isTextEncoded: Bool, closeConnection: Bool) {
  177. self.responseHeadersSent = false
  178. self.responseBuffer = isTextEncoded ? CircularBuffer() : nil
  179. self.closeConnection = closeConnection
  180. }
  181. }
  182. }
  183. }
  184. extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
  185. fileprivate mutating func processInbound(
  186. serverRequestPart: HTTPServerRequestPart,
  187. scheme: String,
  188. allocator: ByteBufferAllocator
  189. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  190. switch serverRequestPart {
  191. case let .head(head):
  192. return self.processRequestHead(head, scheme: scheme, allocator: allocator)
  193. case var .body(buffer):
  194. return self.processRequestBody(&buffer)
  195. case .end:
  196. return self.processRequestEnd(allocator: allocator)
  197. }
  198. }
  199. fileprivate mutating func processOutbound(
  200. framePayload: HTTP2Frame.FramePayload,
  201. promise: EventLoopPromise<Void>?,
  202. allocator: ByteBufferAllocator
  203. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  204. switch framePayload {
  205. case let .headers(payload):
  206. return self.processResponseHeaders(payload, promise: promise, allocator: allocator)
  207. case let .data(payload):
  208. return self.processResponseData(payload, promise: promise)
  209. case .priority,
  210. .rstStream,
  211. .settings,
  212. .pushPromise,
  213. .ping,
  214. .goAway,
  215. .windowUpdate,
  216. .alternativeService,
  217. .origin:
  218. preconditionFailure("Unsupported frame payload")
  219. }
  220. }
  221. }
  222. // MARK: - Inbound
  223. extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
  224. private mutating func processRequestHead(
  225. _ head: HTTPRequestHead,
  226. scheme: String,
  227. allocator: ByteBufferAllocator
  228. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  229. switch self {
  230. case .idle:
  231. return self.withStateAvoidingCoWs { state in
  232. let normalized = HPACKHeaders(httpHeaders: head.headers, normalizeHTTPHeaders: true)
  233. // Regular headers need to come after the pseudo headers. Unfortunately, this means we need to
  234. // allocate a second headers block to use the normalization provided by NIO HTTP/2.
  235. //
  236. // TODO: Use API provided by https://github.com/apple/swift-nio-http2/issues/254 to avoid the
  237. // extra copy.
  238. var headers = HPACKHeaders()
  239. headers.reserveCapacity(normalized.count + 4)
  240. headers.add(name: ":path", value: head.uri)
  241. headers.add(name: ":method", value: head.method.rawValue)
  242. headers.add(name: ":scheme", value: scheme)
  243. if let host = head.headers.first(name: "host") {
  244. headers.add(name: ":authority", value: host)
  245. }
  246. headers.add(contentsOf: normalized)
  247. // Check whether we're dealing with gRPC Web Text. No need to fully validate the content-type
  248. // that will be done at the HTTP/2 level.
  249. let contentType = headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init)
  250. let isWebText = contentType == .some(.webTextProtobuf)
  251. let closeConnection = head.headers[canonicalForm: "connection"].contains("close")
  252. state = .fullyOpen(
  253. .init(isTextEncoded: isWebText, allocator: allocator),
  254. .init(isTextEncoded: isWebText, closeConnection: closeConnection)
  255. )
  256. return .fireChannelRead(.headers(.init(headers: headers)))
  257. }
  258. case .fullyOpen, .clientOpenServerClosed, .clientClosedServerOpen:
  259. preconditionFailure("Invalid state: already received request head")
  260. case ._modifying:
  261. preconditionFailure("Left in modifying state")
  262. }
  263. }
  264. private mutating func processRequestBody(
  265. _ buffer: inout ByteBuffer
  266. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  267. switch self {
  268. case .idle:
  269. preconditionFailure("Invalid state: haven't received request head")
  270. case .fullyOpen(var inbound, let outbound):
  271. return self.withStateAvoidingCoWs { state in
  272. let action = inbound.processInboundData(buffer: &buffer)
  273. state = .fullyOpen(inbound, outbound)
  274. return action
  275. }
  276. case var .clientOpenServerClosed(inbound):
  277. // The server is already done, but it's not our place to drop the request.
  278. return self.withStateAvoidingCoWs { state in
  279. let action = inbound.processInboundData(buffer: &buffer)
  280. state = .clientOpenServerClosed(inbound)
  281. return action
  282. }
  283. case .clientClosedServerOpen:
  284. preconditionFailure("End of request stream already received")
  285. case ._modifying:
  286. preconditionFailure("Left in modifying state")
  287. }
  288. }
  289. private mutating func processRequestEnd(
  290. allocator: ByteBufferAllocator
  291. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  292. switch self {
  293. case .idle:
  294. preconditionFailure("Invalid state: haven't received request head")
  295. case let .fullyOpen(_, outbound):
  296. return self.withStateAvoidingCoWs { state in
  297. // We're done with inbound state.
  298. state = .clientClosedServerOpen(outbound)
  299. // Send an empty DATA frame with the end stream flag set.
  300. let empty = allocator.buffer(capacity: 0)
  301. return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true)))
  302. }
  303. case .clientClosedServerOpen:
  304. preconditionFailure("End of request stream already received")
  305. case .clientOpenServerClosed:
  306. return self.withStateAvoidingCoWs { state in
  307. // Both sides are closed now, back to idle. Don't forget to pass on the .end, as
  308. // it's necessary to communicate to the other peers that the response is done.
  309. state = .idle
  310. // Send an empty DATA frame with the end stream flag set.
  311. let empty = allocator.buffer(capacity: 0)
  312. return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true)))
  313. }
  314. case ._modifying:
  315. preconditionFailure("Left in modifying state")
  316. }
  317. }
  318. }
  319. // MARK: - Outbound
  320. extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
  321. private mutating func processResponseTrailers(
  322. _ trailers: HPACKHeaders,
  323. promise: EventLoopPromise<Void>?,
  324. allocator: ByteBufferAllocator
  325. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  326. switch self {
  327. case .idle:
  328. preconditionFailure("Invalid state: haven't received request head")
  329. case .fullyOpen(let inbound, var outbound):
  330. return self.withStateAvoidingCoWs { state in
  331. // Double check these are trailers.
  332. assert(outbound.responseHeadersSent)
  333. // We haven't seen the end of the request stream yet.
  334. state = .clientOpenServerClosed(inbound)
  335. // Avoid CoW-ing the buffers.
  336. let responseBuffers = outbound.responseBuffer
  337. outbound.responseBuffer = nil
  338. return Self.processTrailers(
  339. responseBuffers: responseBuffers,
  340. trailers: trailers,
  341. promise: promise,
  342. allocator: allocator,
  343. closeChannel: outbound.closeConnection
  344. )
  345. }
  346. case var .clientClosedServerOpen(state):
  347. return self.withStateAvoidingCoWs { nextState in
  348. // Client is closed and now so is the server.
  349. nextState = .idle
  350. // Avoid CoW-ing the buffers.
  351. let responseBuffers = state.responseBuffer
  352. state.responseBuffer = nil
  353. return Self.processTrailers(
  354. responseBuffers: responseBuffers,
  355. trailers: trailers,
  356. promise: promise,
  357. allocator: allocator,
  358. closeChannel: state.closeConnection
  359. )
  360. }
  361. case .clientOpenServerClosed:
  362. preconditionFailure("Already seen end of response stream")
  363. case ._modifying:
  364. preconditionFailure("Left in modifying state")
  365. }
  366. }
  367. private static func processTrailers(
  368. responseBuffers: CircularBuffer<ByteBuffer>?,
  369. trailers: HPACKHeaders,
  370. promise: EventLoopPromise<Void>?,
  371. allocator: ByteBufferAllocator,
  372. closeChannel: Bool
  373. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  374. if var responseBuffers = responseBuffers {
  375. let buffer = GRPCWebToHTTP2ServerCodec.encodeResponsesAndTrailers(
  376. &responseBuffers,
  377. trailers: trailers,
  378. allocator: allocator
  379. )
  380. return .write(
  381. .init(
  382. part: .body(.byteBuffer(buffer)),
  383. additionalPart: .end(nil),
  384. promise: promise,
  385. closeChannel: closeChannel
  386. )
  387. )
  388. } else {
  389. // No response buffer; plain gRPC Web.
  390. let trailers = HTTPHeaders(hpackHeaders: trailers)
  391. return .write(.init(part: .end(trailers), promise: promise, closeChannel: closeChannel))
  392. }
  393. }
  394. private mutating func processResponseTrailersOnly(
  395. _ trailers: HPACKHeaders,
  396. promise: EventLoopPromise<Void>?
  397. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  398. switch self {
  399. case .idle:
  400. preconditionFailure("Invalid state: haven't received request head")
  401. case let .fullyOpen(inbound, outbound):
  402. return self.withStateAvoidingCoWs { state in
  403. // We still haven't seen the end of the request stream.
  404. state = .clientOpenServerClosed(inbound)
  405. let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
  406. hpackHeaders: trailers,
  407. closeConnection: outbound.closeConnection
  408. )
  409. return .write(
  410. .init(
  411. part: .head(head),
  412. additionalPart: .end(nil),
  413. promise: promise,
  414. closeChannel: outbound.closeConnection
  415. )
  416. )
  417. }
  418. case let .clientClosedServerOpen(outbound):
  419. return self.withStateAvoidingCoWs { state in
  420. // We're done, back to idle.
  421. state = .idle
  422. let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
  423. hpackHeaders: trailers,
  424. closeConnection: outbound.closeConnection
  425. )
  426. return .write(
  427. .init(
  428. part: .head(head),
  429. additionalPart: .end(nil),
  430. promise: promise,
  431. closeChannel: outbound.closeConnection
  432. )
  433. )
  434. }
  435. case .clientOpenServerClosed:
  436. preconditionFailure("Already seen end of response stream")
  437. case ._modifying:
  438. preconditionFailure("Left in modifying state")
  439. }
  440. }
  441. private mutating func processResponseHeaders(
  442. _ headers: HPACKHeaders,
  443. promise: EventLoopPromise<Void>?
  444. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  445. switch self {
  446. case .idle:
  447. preconditionFailure("Invalid state: haven't received request head")
  448. case .fullyOpen(let inbound, var outbound):
  449. return self.withStateAvoidingCoWs { state in
  450. outbound.responseHeadersSent = true
  451. state = .fullyOpen(inbound, outbound)
  452. let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
  453. hpackHeaders: headers,
  454. closeConnection: outbound.closeConnection
  455. )
  456. return .write(.init(part: .head(head), promise: promise, closeChannel: false))
  457. }
  458. case var .clientClosedServerOpen(outbound):
  459. return self.withStateAvoidingCoWs { state in
  460. outbound.responseHeadersSent = true
  461. state = .clientClosedServerOpen(outbound)
  462. let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
  463. hpackHeaders: headers,
  464. closeConnection: outbound.closeConnection
  465. )
  466. return .write(.init(part: .head(head), promise: promise, closeChannel: false))
  467. }
  468. case .clientOpenServerClosed:
  469. preconditionFailure("Already seen end of response stream")
  470. case ._modifying:
  471. preconditionFailure("Left in modifying state")
  472. }
  473. }
  474. private mutating func processResponseHeaders(
  475. _ payload: HTTP2Frame.FramePayload.Headers,
  476. promise: EventLoopPromise<Void>?,
  477. allocator: ByteBufferAllocator
  478. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  479. switch self {
  480. case .idle:
  481. preconditionFailure("Invalid state: haven't received request head")
  482. case let .fullyOpen(_, outbound),
  483. let .clientClosedServerOpen(outbound):
  484. if outbound.responseHeadersSent {
  485. // Headers have been sent, these must be trailers, so end stream must be set.
  486. assert(payload.endStream)
  487. return self.processResponseTrailers(payload.headers, promise: promise, allocator: allocator)
  488. } else if payload.endStream {
  489. // Headers haven't been sent yet and end stream is set: this is a trailers only response
  490. // so we need to send 'end' as well.
  491. return self.processResponseTrailersOnly(payload.headers, promise: promise)
  492. } else {
  493. return self.processResponseHeaders(payload.headers, promise: promise)
  494. }
  495. case .clientOpenServerClosed:
  496. // We've already sent end.
  497. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  498. case ._modifying:
  499. preconditionFailure("Left in modifying state")
  500. }
  501. }
  502. private static func processResponseData(
  503. _ payload: HTTP2Frame.FramePayload.Data,
  504. promise: EventLoopPromise<Void>?,
  505. state: inout GRPCWebToHTTP2ServerCodec.StateMachine.OutboundState
  506. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  507. if state.responseBuffer == nil {
  508. // Not gRPC Web Text; just write the body.
  509. return .write(.init(part: .body(payload.data), promise: promise, closeChannel: false))
  510. } else {
  511. switch payload.data {
  512. case let .byteBuffer(buffer):
  513. // '!' is fine, we checked above.
  514. state.responseBuffer!.append(buffer)
  515. case .fileRegion:
  516. preconditionFailure("Unexpected IOData.fileRegion")
  517. }
  518. // The response is buffered, we can consider it dealt with.
  519. return .completePromise(promise, .success(()))
  520. }
  521. }
  522. private mutating func processResponseData(
  523. _ payload: HTTP2Frame.FramePayload.Data,
  524. promise: EventLoopPromise<Void>?
  525. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  526. switch self {
  527. case .idle:
  528. preconditionFailure("Invalid state: haven't received request head")
  529. case .fullyOpen(let inbound, var outbound):
  530. return self.withStateAvoidingCoWs { state in
  531. let action = Self.processResponseData(payload, promise: promise, state: &outbound)
  532. state = .fullyOpen(inbound, outbound)
  533. return action
  534. }
  535. case var .clientClosedServerOpen(outbound):
  536. return self.withStateAvoidingCoWs { state in
  537. let action = Self.processResponseData(payload, promise: promise, state: &outbound)
  538. state = .clientClosedServerOpen(outbound)
  539. return action
  540. }
  541. case .clientOpenServerClosed:
  542. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  543. case ._modifying:
  544. preconditionFailure("Left in modifying state")
  545. }
  546. }
  547. }
  548. // MARK: - Helpers
  549. extension GRPCWebToHTTP2ServerCodec {
  550. private static func makeResponseHead(
  551. hpackHeaders: HPACKHeaders,
  552. closeConnection: Bool
  553. ) -> HTTPResponseHead {
  554. var headers = HTTPHeaders(hpackHeaders: hpackHeaders)
  555. if closeConnection {
  556. headers.add(name: "connection", value: "close")
  557. }
  558. // Grab the status, if this is missing we've messed up in another handler.
  559. guard let statusCode = hpackHeaders.first(name: ":status").flatMap(Int.init) else {
  560. preconditionFailure("Invalid state: missing ':status' pseudo header")
  561. }
  562. return HTTPResponseHead(
  563. version: .init(major: 1, minor: 1),
  564. status: .init(statusCode: statusCode),
  565. headers: headers
  566. )
  567. }
  568. private static func formatTrailers(
  569. _ trailers: HPACKHeaders,
  570. allocator: ByteBufferAllocator
  571. ) -> ByteBuffer {
  572. // See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md
  573. let encodedTrailers = trailers.map { name, value, _ in
  574. "\(name): \(value)"
  575. }.joined(separator: "\r\n")
  576. var buffer = allocator.buffer(capacity: 5 + encodedTrailers.utf8.count)
  577. // Uncompressed trailer byte.
  578. buffer.writeInteger(UInt8(0x80))
  579. // Length.
  580. buffer.writeInteger(UInt32(encodedTrailers.utf8.count))
  581. // Uncompressed trailers.
  582. buffer.writeString(encodedTrailers)
  583. return buffer
  584. }
  585. private static func encodeResponsesAndTrailers(
  586. _ responses: inout CircularBuffer<ByteBuffer>,
  587. trailers: HPACKHeaders,
  588. allocator: ByteBufferAllocator
  589. ) -> ByteBuffer {
  590. // We need to encode the trailers along with any responses we're holding.
  591. responses.append(self.formatTrailers(trailers, allocator: allocator))
  592. let capacity = responses.lazy.map { $0.readableBytes }.reduce(0, +)
  593. // '!' is fine: responses isn't empty, we just appended the trailers.
  594. var buffer = responses.popFirst()!
  595. // Accumulate all the buffers into a single 'Data'. Ideally we wouldn't copy back and forth
  596. // but this is fine for now.
  597. var accumulatedData = buffer.readData(length: buffer.readableBytes)!
  598. accumulatedData.reserveCapacity(capacity)
  599. while let buffer = responses.popFirst() {
  600. accumulatedData.append(contentsOf: buffer.readableBytesView)
  601. }
  602. // We can reuse the popped buffer.
  603. let base64Encoded = accumulatedData.base64EncodedString()
  604. buffer.clear(minimumCapacity: base64Encoded.utf8.count)
  605. buffer.writeString(base64Encoded)
  606. return buffer
  607. }
  608. }
  609. extension GRPCWebToHTTP2ServerCodec.StateMachine.InboundState {
  610. fileprivate mutating func processInboundData(
  611. buffer: inout ByteBuffer
  612. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  613. if self.requestBuffer == nil {
  614. // We're not dealing with gRPC Web Text: just forward the buffer.
  615. return .fireChannelRead(.data(.init(data: .byteBuffer(buffer))))
  616. }
  617. if self.requestBuffer!.readableBytes == 0 {
  618. self.requestBuffer = buffer
  619. } else {
  620. self.requestBuffer!.writeBuffer(&buffer)
  621. }
  622. let readableBytes = self.requestBuffer!.readableBytes
  623. // The length of base64 encoded data must be a multiple of 4.
  624. let bytesToRead = readableBytes - (readableBytes % 4)
  625. let action: GRPCWebToHTTP2ServerCodec.StateMachine.Action
  626. if bytesToRead > 0,
  627. let base64Encoded = self.requestBuffer!.readString(length: bytesToRead),
  628. let base64Decoded = Data(base64Encoded: base64Encoded) {
  629. // Recycle the input buffer and restore the request buffer.
  630. buffer.clear()
  631. buffer.writeContiguousBytes(base64Decoded)
  632. action = .fireChannelRead(.data(.init(data: .byteBuffer(buffer))))
  633. } else {
  634. action = .none
  635. }
  636. return action
  637. }
  638. }
  639. extension HTTPHeaders {
  640. fileprivate init(hpackHeaders headers: HPACKHeaders) {
  641. self.init()
  642. self.reserveCapacity(headers.count)
  643. // Pseudo-headers are at the start of the block, so drop them and then add the remaining.
  644. let regularHeaders = headers.drop { name, _, _ in
  645. name.utf8.first == .some(UInt8(ascii: ":"))
  646. }.lazy.map { name, value, _ in
  647. (name, value)
  648. }
  649. self.add(contentsOf: regularHeaders)
  650. }
  651. }