GRPCWebToHTTP2ServerCodec.swift 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. import NIOHPACK
  18. import NIOHTTP1
  19. import NIOHTTP2
  20. import struct Foundation.Data
  21. /// A codec for translating between gRPC Web (as HTTP/1) and HTTP/2 frame payloads.
  22. internal final class GRPCWebToHTTP2ServerCodec: ChannelDuplexHandler {
  23. internal typealias InboundIn = HTTPServerRequestPart
  24. internal typealias InboundOut = HTTP2Frame.FramePayload
  25. internal typealias OutboundIn = HTTP2Frame.FramePayload
  26. internal typealias OutboundOut = HTTPServerResponsePart
  27. private var stateMachine: StateMachine
  28. /// Create a gRPC Web to server HTTP/2 codec.
  29. ///
  30. /// - Parameter scheme: The value of the ':scheme' pseudo header to insert when converting the
  31. /// request headers.
  32. init(scheme: String) {
  33. self.stateMachine = StateMachine(scheme: scheme)
  34. }
  35. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  36. let action = self.stateMachine.processInbound(
  37. serverRequestPart: self.unwrapInboundIn(data),
  38. allocator: context.channel.allocator
  39. )
  40. self.act(on: action, context: context)
  41. }
  42. internal func write(
  43. context: ChannelHandlerContext,
  44. data: NIOAny,
  45. promise: EventLoopPromise<Void>?
  46. ) {
  47. let action = self.stateMachine.processOutbound(
  48. framePayload: self.unwrapOutboundIn(data),
  49. promise: promise,
  50. allocator: context.channel.allocator
  51. )
  52. self.act(on: action, context: context)
  53. }
  54. /// Acts on an action returned by the state machine.
  55. private func act(on action: StateMachine.Action, context: ChannelHandlerContext) {
  56. switch action {
  57. case .none:
  58. ()
  59. case let .fireChannelRead(payload):
  60. context.fireChannelRead(self.wrapInboundOut(payload))
  61. case let .write(write):
  62. if let additionalPart = write.additionalPart {
  63. context.write(self.wrapOutboundOut(write.part), promise: nil)
  64. context.write(self.wrapOutboundOut(additionalPart), promise: write.promise)
  65. } else {
  66. context.write(self.wrapOutboundOut(write.part), promise: write.promise)
  67. }
  68. if write.closeChannel {
  69. context.close(mode: .all, promise: nil)
  70. }
  71. case let .completePromise(promise, result):
  72. promise?.completeWith(result)
  73. }
  74. }
  75. }
  76. extension GRPCWebToHTTP2ServerCodec {
  77. internal struct StateMachine {
  78. /// The current state.
  79. private var state: State
  80. private let scheme: String
  81. internal init(scheme: String) {
  82. self.state = .idle
  83. self.scheme = scheme
  84. }
  85. /// Process the inbound `HTTPServerRequestPart`.
  86. internal mutating func processInbound(
  87. serverRequestPart: HTTPServerRequestPart,
  88. allocator: ByteBufferAllocator
  89. ) -> Action {
  90. return self.state.processInbound(
  91. serverRequestPart: serverRequestPart,
  92. scheme: self.scheme,
  93. allocator: allocator
  94. )
  95. }
  96. /// Process the outbound `HTTP2Frame.FramePayload`.
  97. internal mutating func processOutbound(
  98. framePayload: HTTP2Frame.FramePayload,
  99. promise: EventLoopPromise<Void>?,
  100. allocator: ByteBufferAllocator
  101. ) -> Action {
  102. return self.state.processOutbound(
  103. framePayload: framePayload,
  104. promise: promise,
  105. allocator: allocator
  106. )
  107. }
  108. /// An action to take as a result of interaction with the state machine.
  109. internal enum Action {
  110. case none
  111. case fireChannelRead(HTTP2Frame.FramePayload)
  112. case write(Write)
  113. case completePromise(EventLoopPromise<Void>?, Result<Void, Error>)
  114. internal struct Write {
  115. internal var part: HTTPServerResponsePart
  116. internal var additionalPart: HTTPServerResponsePart?
  117. internal var promise: EventLoopPromise<Void>?
  118. internal var closeChannel: Bool
  119. internal init(
  120. part: HTTPServerResponsePart,
  121. additionalPart: HTTPServerResponsePart? = nil,
  122. promise: EventLoopPromise<Void>?,
  123. closeChannel: Bool
  124. ) {
  125. self.part = part
  126. self.additionalPart = additionalPart
  127. self.promise = promise
  128. self.closeChannel = closeChannel
  129. }
  130. }
  131. }
  132. fileprivate enum State {
  133. /// Idle; nothing has been received or sent. The only valid transition is to 'fullyOpen' when
  134. /// receiving request headers.
  135. case idle
  136. /// Received request headers. Waiting for the end of request and response streams.
  137. case fullyOpen(InboundState, OutboundState)
  138. /// The server has closed the response stream, we may receive other request parts from the client.
  139. case clientOpenServerClosed(InboundState)
  140. /// The client has sent everything, the server still needs to close the response stream.
  141. case clientClosedServerOpen(OutboundState)
  142. /// Not a real state.
  143. case _modifying
  144. private var isModifying: Bool {
  145. switch self {
  146. case ._modifying:
  147. return true
  148. case .idle, .fullyOpen, .clientClosedServerOpen, .clientOpenServerClosed:
  149. return false
  150. }
  151. }
  152. private mutating func withStateAvoidingCoWs(_ body: (inout State) -> Action) -> Action {
  153. self = ._modifying
  154. defer {
  155. assert(!self.isModifying)
  156. }
  157. return body(&self)
  158. }
  159. }
  160. fileprivate struct InboundState {
  161. /// A `ByteBuffer` containing the base64 encoded bytes of the request stream if gRPC Web Text
  162. /// is being used, `nil` otherwise.
  163. var requestBuffer: ByteBuffer?
  164. init(isTextEncoded: Bool, allocator: ByteBufferAllocator) {
  165. self.requestBuffer = isTextEncoded ? allocator.buffer(capacity: 0) : nil
  166. }
  167. }
  168. fileprivate struct OutboundState {
  169. /// A `CircularBuffer` holding any response messages if gRPC Web Text is being used, `nil`
  170. /// otherwise.
  171. var responseBuffer: CircularBuffer<ByteBuffer>?
  172. /// True if the response headers have been sent.
  173. var responseHeadersSent: Bool
  174. /// True if the server should close the connection when this request is done.
  175. var closeConnection: Bool
  176. init(isTextEncoded: Bool, closeConnection: Bool) {
  177. self.responseHeadersSent = false
  178. self.responseBuffer = isTextEncoded ? CircularBuffer() : nil
  179. self.closeConnection = closeConnection
  180. }
  181. }
  182. }
  183. }
  184. extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
  185. fileprivate mutating func processInbound(
  186. serverRequestPart: HTTPServerRequestPart,
  187. scheme: String,
  188. allocator: ByteBufferAllocator
  189. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  190. switch serverRequestPart {
  191. case let .head(head):
  192. return self.processRequestHead(head, scheme: scheme, allocator: allocator)
  193. case var .body(buffer):
  194. return self.processRequestBody(&buffer)
  195. case .end:
  196. return self.processRequestEnd(allocator: allocator)
  197. }
  198. }
  199. fileprivate mutating func processOutbound(
  200. framePayload: HTTP2Frame.FramePayload,
  201. promise: EventLoopPromise<Void>?,
  202. allocator: ByteBufferAllocator
  203. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  204. switch framePayload {
  205. case let .headers(payload):
  206. return self.processResponseHeaders(payload, promise: promise, allocator: allocator)
  207. case let .data(payload):
  208. return self.processResponseData(payload, promise: promise)
  209. case .priority,
  210. .rstStream,
  211. .settings,
  212. .pushPromise,
  213. .ping,
  214. .goAway,
  215. .windowUpdate,
  216. .alternativeService,
  217. .origin:
  218. preconditionFailure("Unsupported frame payload")
  219. }
  220. }
  221. }
  222. // MARK: - Inbound
  223. extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
  224. private mutating func processRequestHead(
  225. _ head: HTTPRequestHead,
  226. scheme: String,
  227. allocator: ByteBufferAllocator
  228. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  229. switch self {
  230. case .idle:
  231. return self.withStateAvoidingCoWs { state in
  232. let normalized = HPACKHeaders(httpHeaders: head.headers, normalizeHTTPHeaders: true)
  233. // Regular headers need to come after the pseudo headers. Unfortunately, this means we need to
  234. // allocate a second headers block to use the normalization provided by NIO HTTP/2.
  235. //
  236. // TODO: Use API provided by https://github.com/apple/swift-nio-http2/issues/254 to avoid the
  237. // extra copy.
  238. var headers = HPACKHeaders()
  239. headers.reserveCapacity(normalized.count + 4)
  240. headers.add(name: ":path", value: head.uri)
  241. headers.add(name: ":method", value: head.method.rawValue)
  242. headers.add(name: ":scheme", value: scheme)
  243. if let host = head.headers.first(name: "host") {
  244. headers.add(name: ":authority", value: host)
  245. }
  246. headers.add(contentsOf: normalized)
  247. // Check whether we're dealing with gRPC Web Text. No need to fully validate the content-type
  248. // that will be done at the HTTP/2 level.
  249. let contentType = headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init)
  250. let isWebText = contentType == .some(.webTextProtobuf)
  251. let closeConnection = head.headers[canonicalForm: "connection"].contains("close")
  252. state = .fullyOpen(
  253. .init(isTextEncoded: isWebText, allocator: allocator),
  254. .init(isTextEncoded: isWebText, closeConnection: closeConnection)
  255. )
  256. return .fireChannelRead(.headers(.init(headers: headers)))
  257. }
  258. case .fullyOpen, .clientOpenServerClosed, .clientClosedServerOpen:
  259. preconditionFailure("Invalid state: already received request head")
  260. case ._modifying:
  261. preconditionFailure("Left in modifying state")
  262. }
  263. }
  264. private mutating func processRequestBody(
  265. _ buffer: inout ByteBuffer
  266. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  267. switch self {
  268. case .idle:
  269. preconditionFailure("Invalid state: haven't received request head")
  270. case .fullyOpen(var inbound, let outbound):
  271. return self.withStateAvoidingCoWs { state in
  272. let action = inbound.processInboundData(buffer: &buffer)
  273. state = .fullyOpen(inbound, outbound)
  274. return action
  275. }
  276. case var .clientOpenServerClosed(inbound):
  277. // The server is already done, but it's not our place to drop the request.
  278. return self.withStateAvoidingCoWs { state in
  279. let action = inbound.processInboundData(buffer: &buffer)
  280. state = .clientOpenServerClosed(inbound)
  281. return action
  282. }
  283. case .clientClosedServerOpen:
  284. preconditionFailure("End of request stream already received")
  285. case ._modifying:
  286. preconditionFailure("Left in modifying state")
  287. }
  288. }
  289. private mutating func processRequestEnd(
  290. allocator: ByteBufferAllocator
  291. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  292. switch self {
  293. case .idle:
  294. preconditionFailure("Invalid state: haven't received request head")
  295. case let .fullyOpen(_, outbound):
  296. return self.withStateAvoidingCoWs { state in
  297. // We're done with inbound state.
  298. state = .clientClosedServerOpen(outbound)
  299. // Send an empty DATA frame with the end stream flag set.
  300. let empty = allocator.buffer(capacity: 0)
  301. return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true)))
  302. }
  303. case .clientClosedServerOpen:
  304. preconditionFailure("End of request stream already received")
  305. case .clientOpenServerClosed:
  306. return self.withStateAvoidingCoWs { state in
  307. // Both sides are closed now, back to idle. Don't forget to pass on the .end, as
  308. // it's necessary to communicate to the other peers that the response is done.
  309. state = .idle
  310. // Send an empty DATA frame with the end stream flag set.
  311. let empty = allocator.buffer(capacity: 0)
  312. return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true)))
  313. }
  314. case ._modifying:
  315. preconditionFailure("Left in modifying state")
  316. }
  317. }
  318. }
  319. // MARK: - Outbound
  320. extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
  321. private mutating func processResponseTrailers(
  322. _ trailers: HPACKHeaders,
  323. promise: EventLoopPromise<Void>?,
  324. allocator: ByteBufferAllocator
  325. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  326. switch self {
  327. case .idle:
  328. preconditionFailure("Invalid state: haven't received request head")
  329. case .fullyOpen(let inbound, var outbound):
  330. return self.withStateAvoidingCoWs { state in
  331. // Double check these are trailers.
  332. assert(outbound.responseHeadersSent)
  333. // We haven't seen the end of the request stream yet.
  334. state = .clientOpenServerClosed(inbound)
  335. // Avoid CoW-ing the buffers.
  336. let responseBuffers = outbound.responseBuffer
  337. outbound.responseBuffer = nil
  338. return Self.processTrailers(
  339. responseBuffers: responseBuffers,
  340. trailers: trailers,
  341. promise: promise,
  342. allocator: allocator,
  343. closeChannel: outbound.closeConnection
  344. )
  345. }
  346. case var .clientClosedServerOpen(state):
  347. return self.withStateAvoidingCoWs { nextState in
  348. // Client is closed and now so is the server.
  349. nextState = .idle
  350. // Avoid CoW-ing the buffers.
  351. let responseBuffers = state.responseBuffer
  352. state.responseBuffer = nil
  353. return Self.processTrailers(
  354. responseBuffers: responseBuffers,
  355. trailers: trailers,
  356. promise: promise,
  357. allocator: allocator,
  358. closeChannel: state.closeConnection
  359. )
  360. }
  361. case .clientOpenServerClosed:
  362. preconditionFailure("Already seen end of response stream")
  363. case ._modifying:
  364. preconditionFailure("Left in modifying state")
  365. }
  366. }
  367. private static func processTrailers(
  368. responseBuffers: CircularBuffer<ByteBuffer>?,
  369. trailers: HPACKHeaders,
  370. promise: EventLoopPromise<Void>?,
  371. allocator: ByteBufferAllocator,
  372. closeChannel: Bool
  373. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  374. if var responseBuffers = responseBuffers {
  375. let buffer = GRPCWebToHTTP2ServerCodec.encodeResponsesAndTrailers(
  376. &responseBuffers,
  377. trailers: trailers,
  378. allocator: allocator
  379. )
  380. return .write(
  381. .init(
  382. part: .body(.byteBuffer(buffer)),
  383. additionalPart: .end(nil),
  384. promise: promise,
  385. closeChannel: closeChannel
  386. )
  387. )
  388. } else {
  389. // No response buffer; plain gRPC Web. Trailers are encoded into the body as a regular
  390. // length-prefixed message.
  391. let buffer = GRPCWebToHTTP2ServerCodec.formatTrailers(trailers, allocator: allocator)
  392. return .write(
  393. .init(
  394. part: .body(.byteBuffer(buffer)),
  395. additionalPart: .end(nil),
  396. promise: promise,
  397. closeChannel: closeChannel
  398. )
  399. )
  400. }
  401. }
  402. private mutating func processResponseTrailersOnly(
  403. _ trailers: HPACKHeaders,
  404. promise: EventLoopPromise<Void>?
  405. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  406. switch self {
  407. case .idle:
  408. preconditionFailure("Invalid state: haven't received request head")
  409. case let .fullyOpen(inbound, outbound):
  410. return self.withStateAvoidingCoWs { state in
  411. // We still haven't seen the end of the request stream.
  412. state = .clientOpenServerClosed(inbound)
  413. let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
  414. hpackHeaders: trailers,
  415. closeConnection: outbound.closeConnection
  416. )
  417. return .write(
  418. .init(
  419. part: .head(head),
  420. additionalPart: .end(nil),
  421. promise: promise,
  422. closeChannel: outbound.closeConnection
  423. )
  424. )
  425. }
  426. case let .clientClosedServerOpen(outbound):
  427. return self.withStateAvoidingCoWs { state in
  428. // We're done, back to idle.
  429. state = .idle
  430. let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
  431. hpackHeaders: trailers,
  432. closeConnection: outbound.closeConnection
  433. )
  434. return .write(
  435. .init(
  436. part: .head(head),
  437. additionalPart: .end(nil),
  438. promise: promise,
  439. closeChannel: outbound.closeConnection
  440. )
  441. )
  442. }
  443. case .clientOpenServerClosed:
  444. preconditionFailure("Already seen end of response stream")
  445. case ._modifying:
  446. preconditionFailure("Left in modifying state")
  447. }
  448. }
  449. private mutating func processResponseHeaders(
  450. _ headers: HPACKHeaders,
  451. promise: EventLoopPromise<Void>?
  452. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  453. switch self {
  454. case .idle:
  455. preconditionFailure("Invalid state: haven't received request head")
  456. case .fullyOpen(let inbound, var outbound):
  457. return self.withStateAvoidingCoWs { state in
  458. outbound.responseHeadersSent = true
  459. state = .fullyOpen(inbound, outbound)
  460. let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
  461. hpackHeaders: headers,
  462. closeConnection: outbound.closeConnection
  463. )
  464. return .write(.init(part: .head(head), promise: promise, closeChannel: false))
  465. }
  466. case var .clientClosedServerOpen(outbound):
  467. return self.withStateAvoidingCoWs { state in
  468. outbound.responseHeadersSent = true
  469. state = .clientClosedServerOpen(outbound)
  470. let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(
  471. hpackHeaders: headers,
  472. closeConnection: outbound.closeConnection
  473. )
  474. return .write(.init(part: .head(head), promise: promise, closeChannel: false))
  475. }
  476. case .clientOpenServerClosed:
  477. preconditionFailure("Already seen end of response stream")
  478. case ._modifying:
  479. preconditionFailure("Left in modifying state")
  480. }
  481. }
  482. private mutating func processResponseHeaders(
  483. _ payload: HTTP2Frame.FramePayload.Headers,
  484. promise: EventLoopPromise<Void>?,
  485. allocator: ByteBufferAllocator
  486. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  487. switch self {
  488. case .idle:
  489. preconditionFailure("Invalid state: haven't received request head")
  490. case let .fullyOpen(_, outbound),
  491. let .clientClosedServerOpen(outbound):
  492. if outbound.responseHeadersSent {
  493. // Headers have been sent, these must be trailers, so end stream must be set.
  494. assert(payload.endStream)
  495. return self.processResponseTrailers(payload.headers, promise: promise, allocator: allocator)
  496. } else if payload.endStream {
  497. // Headers haven't been sent yet and end stream is set: this is a trailers only response
  498. // so we need to send 'end' as well.
  499. return self.processResponseTrailersOnly(payload.headers, promise: promise)
  500. } else {
  501. return self.processResponseHeaders(payload.headers, promise: promise)
  502. }
  503. case .clientOpenServerClosed:
  504. // We've already sent end.
  505. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  506. case ._modifying:
  507. preconditionFailure("Left in modifying state")
  508. }
  509. }
  510. private static func processResponseData(
  511. _ payload: HTTP2Frame.FramePayload.Data,
  512. promise: EventLoopPromise<Void>?,
  513. state: inout GRPCWebToHTTP2ServerCodec.StateMachine.OutboundState
  514. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  515. if state.responseBuffer == nil {
  516. // Not gRPC Web Text; just write the body.
  517. return .write(.init(part: .body(payload.data), promise: promise, closeChannel: false))
  518. } else {
  519. switch payload.data {
  520. case let .byteBuffer(buffer):
  521. // '!' is fine, we checked above.
  522. state.responseBuffer!.append(buffer)
  523. case .fileRegion:
  524. preconditionFailure("Unexpected IOData.fileRegion")
  525. }
  526. // The response is buffered, we can consider it dealt with.
  527. return .completePromise(promise, .success(()))
  528. }
  529. }
  530. private mutating func processResponseData(
  531. _ payload: HTTP2Frame.FramePayload.Data,
  532. promise: EventLoopPromise<Void>?
  533. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  534. switch self {
  535. case .idle:
  536. preconditionFailure("Invalid state: haven't received request head")
  537. case .fullyOpen(let inbound, var outbound):
  538. return self.withStateAvoidingCoWs { state in
  539. let action = Self.processResponseData(payload, promise: promise, state: &outbound)
  540. state = .fullyOpen(inbound, outbound)
  541. return action
  542. }
  543. case var .clientClosedServerOpen(outbound):
  544. return self.withStateAvoidingCoWs { state in
  545. let action = Self.processResponseData(payload, promise: promise, state: &outbound)
  546. state = .clientClosedServerOpen(outbound)
  547. return action
  548. }
  549. case .clientOpenServerClosed:
  550. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  551. case ._modifying:
  552. preconditionFailure("Left in modifying state")
  553. }
  554. }
  555. }
  556. // MARK: - Helpers
  557. extension GRPCWebToHTTP2ServerCodec {
  558. private static func makeResponseHead(
  559. hpackHeaders: HPACKHeaders,
  560. closeConnection: Bool
  561. ) -> HTTPResponseHead {
  562. var headers = HTTPHeaders(hpackHeaders: hpackHeaders)
  563. if closeConnection {
  564. headers.add(name: "connection", value: "close")
  565. }
  566. // Grab the status, if this is missing we've messed up in another handler.
  567. guard let statusCode = hpackHeaders.first(name: ":status").flatMap(Int.init) else {
  568. preconditionFailure("Invalid state: missing ':status' pseudo header")
  569. }
  570. return HTTPResponseHead(
  571. version: .init(major: 1, minor: 1),
  572. status: .init(statusCode: statusCode),
  573. headers: headers
  574. )
  575. }
  576. private static func formatTrailers(
  577. _ trailers: HPACKHeaders,
  578. allocator: ByteBufferAllocator
  579. ) -> ByteBuffer {
  580. // See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md
  581. let length = trailers.reduce(0) { partial, trailer in
  582. // +4 for: ":", " ", "\r", "\n"
  583. return partial + trailer.name.utf8.count + trailer.value.utf8.count + 4
  584. }
  585. var buffer = allocator.buffer(capacity: 5 + length)
  586. // Uncompressed trailer byte.
  587. buffer.writeInteger(UInt8(0x80))
  588. // Length.
  589. let lengthIndex = buffer.writerIndex
  590. buffer.writeInteger(UInt32(0))
  591. var bytesWritten = 0
  592. for (name, value, _) in trailers {
  593. bytesWritten += buffer.writeString(name)
  594. bytesWritten += buffer.writeString(": ")
  595. bytesWritten += buffer.writeString(value)
  596. bytesWritten += buffer.writeString("\r\n")
  597. }
  598. buffer.setInteger(UInt32(bytesWritten), at: lengthIndex)
  599. return buffer
  600. }
  601. private static func encodeResponsesAndTrailers(
  602. _ responses: inout CircularBuffer<ByteBuffer>,
  603. trailers: HPACKHeaders,
  604. allocator: ByteBufferAllocator
  605. ) -> ByteBuffer {
  606. // We need to encode the trailers along with any responses we're holding.
  607. responses.append(self.formatTrailers(trailers, allocator: allocator))
  608. let capacity = responses.lazy.map { $0.readableBytes }.reduce(0, +)
  609. // '!' is fine: responses isn't empty, we just appended the trailers.
  610. var buffer = responses.popFirst()!
  611. // Accumulate all the buffers into a single 'Data'. Ideally we wouldn't copy back and forth
  612. // but this is fine for now.
  613. var accumulatedData = buffer.readData(length: buffer.readableBytes)!
  614. accumulatedData.reserveCapacity(capacity)
  615. while let buffer = responses.popFirst() {
  616. accumulatedData.append(contentsOf: buffer.readableBytesView)
  617. }
  618. // We can reuse the popped buffer.
  619. let base64Encoded = accumulatedData.base64EncodedString()
  620. buffer.clear(minimumCapacity: base64Encoded.utf8.count)
  621. buffer.writeString(base64Encoded)
  622. return buffer
  623. }
  624. }
  625. extension GRPCWebToHTTP2ServerCodec.StateMachine.InboundState {
  626. fileprivate mutating func processInboundData(
  627. buffer: inout ByteBuffer
  628. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  629. if self.requestBuffer == nil {
  630. // We're not dealing with gRPC Web Text: just forward the buffer.
  631. return .fireChannelRead(.data(.init(data: .byteBuffer(buffer))))
  632. }
  633. if self.requestBuffer!.readableBytes == 0 {
  634. self.requestBuffer = buffer
  635. } else {
  636. self.requestBuffer!.writeBuffer(&buffer)
  637. }
  638. let readableBytes = self.requestBuffer!.readableBytes
  639. // The length of base64 encoded data must be a multiple of 4.
  640. let bytesToRead = readableBytes - (readableBytes % 4)
  641. let action: GRPCWebToHTTP2ServerCodec.StateMachine.Action
  642. if bytesToRead > 0,
  643. let base64Encoded = self.requestBuffer!.readString(length: bytesToRead),
  644. let base64Decoded = Data(base64Encoded: base64Encoded)
  645. {
  646. // Recycle the input buffer and restore the request buffer.
  647. buffer.clear()
  648. buffer.writeContiguousBytes(base64Decoded)
  649. action = .fireChannelRead(.data(.init(data: .byteBuffer(buffer))))
  650. } else {
  651. action = .none
  652. }
  653. return action
  654. }
  655. }
  656. extension HTTPHeaders {
  657. fileprivate init(hpackHeaders headers: HPACKHeaders) {
  658. self.init()
  659. self.reserveCapacity(headers.count)
  660. // Pseudo-headers are at the start of the block, so drop them and then add the remaining.
  661. let regularHeaders = headers.drop { name, _, _ in
  662. name.utf8.first == .some(UInt8(ascii: ":"))
  663. }.lazy.map { name, value, _ in
  664. (name, value)
  665. }
  666. self.add(contentsOf: regularHeaders)
  667. }
  668. }