GRPCWebToHTTP2ServerCodec.swift 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512
  1. /*
  2. * Copyright 2020, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import struct Foundation.Data
  17. import NIO
  18. import NIOHPACK
  19. import NIOHTTP1
  20. import NIOHTTP2
  21. /// A codec for translating between gRPC Web (as HTTP/1) and HTTP/2 frame payloads.
  22. internal final class GRPCWebToHTTP2ServerCodec: ChannelDuplexHandler {
  23. internal typealias InboundIn = HTTPServerRequestPart
  24. internal typealias InboundOut = HTTP2Frame.FramePayload
  25. internal typealias OutboundIn = HTTP2Frame.FramePayload
  26. internal typealias OutboundOut = HTTPServerResponsePart
  27. private var stateMachine: StateMachine
  28. /// Create a gRPC Web to server HTTP/2 codec.
  29. ///
  30. /// - Parameter scheme: The value of the ':scheme' pseudo header to insert when converting the
  31. /// request headers.
  32. init(scheme: String) {
  33. self.stateMachine = StateMachine(scheme: scheme)
  34. }
  35. internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  36. let action = self.stateMachine.processInbound(
  37. serverRequestPart: self.unwrapInboundIn(data),
  38. allocator: context.channel.allocator
  39. )
  40. self.act(on: action, context: context)
  41. }
  42. internal func write(
  43. context: ChannelHandlerContext,
  44. data: NIOAny,
  45. promise: EventLoopPromise<Void>?
  46. ) {
  47. let action = self.stateMachine.processOutbound(
  48. framePayload: self.unwrapOutboundIn(data),
  49. promise: promise,
  50. allocator: context.channel.allocator
  51. )
  52. self.act(on: action, context: context)
  53. }
  54. /// Acts on an action returned by the state machine.
  55. private func act(on action: StateMachine.Action, context: ChannelHandlerContext) {
  56. switch action {
  57. case .none:
  58. ()
  59. case let .fireChannelRead(payload):
  60. context.fireChannelRead(self.wrapInboundOut(payload))
  61. case let .write(part1, part2, promise):
  62. if let part2 = part2 {
  63. context.write(self.wrapOutboundOut(part1), promise: nil)
  64. context.write(self.wrapOutboundOut(part2), promise: promise)
  65. } else {
  66. context.write(self.wrapOutboundOut(part1), promise: promise)
  67. }
  68. case let .completePromise(promise, result):
  69. promise?.completeWith(result)
  70. }
  71. }
  72. }
  73. extension GRPCWebToHTTP2ServerCodec {
  74. struct StateMachine {
  75. /// The current state.
  76. private var state: State
  77. fileprivate init(scheme: String) {
  78. self.state = .idle(scheme: scheme)
  79. }
  80. private mutating func withStateAvoidingCoWs(_ body: (inout State) -> Action) -> Action {
  81. var state: State = ._modifying
  82. swap(&self.state, &state)
  83. defer {
  84. swap(&self.state, &state)
  85. }
  86. return body(&state)
  87. }
  88. /// Process the inbound `HTTPServerRequestPart`.
  89. fileprivate mutating func processInbound(
  90. serverRequestPart: HTTPServerRequestPart,
  91. allocator: ByteBufferAllocator
  92. ) -> Action {
  93. return self.withStateAvoidingCoWs { state in
  94. state.processInbound(serverRequestPart: serverRequestPart, allocator: allocator)
  95. }
  96. }
  97. /// Process the outbound `HTTP2Frame.FramePayload`.
  98. fileprivate mutating func processOutbound(
  99. framePayload: HTTP2Frame.FramePayload,
  100. promise: EventLoopPromise<Void>?,
  101. allocator: ByteBufferAllocator
  102. ) -> Action {
  103. return self.withStateAvoidingCoWs { state in
  104. state.processOutbound(framePayload: framePayload, promise: promise, allocator: allocator)
  105. }
  106. }
  107. /// An action to take as a result of interaction with the state machine.
  108. fileprivate enum Action {
  109. case none
  110. case fireChannelRead(HTTP2Frame.FramePayload)
  111. case write(HTTPServerResponsePart, HTTPServerResponsePart?, EventLoopPromise<Void>?)
  112. case completePromise(EventLoopPromise<Void>?, Result<Void, Error>)
  113. }
  114. fileprivate enum State {
  115. /// Idle; nothing has been received or sent. The only valid transition is to 'open' when
  116. /// receiving request headers.
  117. case idle(scheme: String)
  118. /// Open; the request headers have been received and we have not sent the end of the response
  119. /// stream.
  120. case open(OpenState)
  121. /// Closed; the response stream (and therefore the request stream) has been closed.
  122. case closed
  123. /// Not a real state.
  124. case _modifying
  125. }
  126. fileprivate struct OpenState {
  127. /// A `ByteBuffer` containing the base64 encoded bytes of the request stream if gRPC Web Text
  128. /// is being used, `nil` otherwise.
  129. var requestBuffer: ByteBuffer?
  130. /// A `CircularBuffer` holding any response messages if gRPC Web Text is being used, `nil`
  131. /// otherwise.
  132. var responseBuffer: CircularBuffer<ByteBuffer>?
  133. /// True if the end of the request stream has been received.
  134. var requestEndSeen: Bool
  135. /// True if the response headers have been sent.
  136. var responseHeadersSent: Bool
  137. init(isTextEncoded: Bool, allocator: ByteBufferAllocator) {
  138. self.requestEndSeen = false
  139. self.responseHeadersSent = false
  140. if isTextEncoded {
  141. self.requestBuffer = allocator.buffer(capacity: 0)
  142. self.responseBuffer = CircularBuffer()
  143. } else {
  144. self.requestBuffer = nil
  145. self.responseBuffer = nil
  146. }
  147. }
  148. }
  149. }
  150. }
  151. extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
  152. fileprivate mutating func processInbound(
  153. serverRequestPart: HTTPServerRequestPart,
  154. allocator: ByteBufferAllocator
  155. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  156. switch serverRequestPart {
  157. case let .head(head):
  158. return self.processRequestHead(head, allocator: allocator)
  159. case var .body(buffer):
  160. return self.processRequestBody(&buffer)
  161. case .end:
  162. return self.processRequestEnd(allocator: allocator)
  163. }
  164. }
  165. fileprivate mutating func processOutbound(
  166. framePayload: HTTP2Frame.FramePayload,
  167. promise: EventLoopPromise<Void>?,
  168. allocator: ByteBufferAllocator
  169. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  170. switch framePayload {
  171. case let .headers(payload):
  172. return self.processResponseHeaders(payload, promise: promise, allocator: allocator)
  173. case let .data(payload):
  174. return self.processResponseData(payload, promise: promise)
  175. case .priority,
  176. .rstStream,
  177. .settings,
  178. .pushPromise,
  179. .ping,
  180. .goAway,
  181. .windowUpdate,
  182. .alternativeService,
  183. .origin:
  184. preconditionFailure("Unsupported frame payload")
  185. }
  186. }
  187. }
  188. // MARK: - Inbound
  189. extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
  190. private mutating func processRequestHead(
  191. _ head: HTTPRequestHead,
  192. allocator: ByteBufferAllocator
  193. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  194. switch self {
  195. case let .idle(scheme):
  196. let normalized = HPACKHeaders(httpHeaders: head.headers, normalizeHTTPHeaders: true)
  197. // Regular headers need to come after the pseudo headers. Unfortunately, this means we need to
  198. // allocate a second headers block to use the normalization provided by NIO HTTP/2.
  199. //
  200. // TODO: Use API provided by https://github.com/apple/swift-nio-http2/issues/254 to avoid the
  201. // extra copy.
  202. var headers = HPACKHeaders()
  203. headers.reserveCapacity(normalized.count + 4)
  204. headers.add(name: ":path", value: head.uri)
  205. headers.add(name: ":method", value: head.method.rawValue)
  206. headers.add(name: ":scheme", value: scheme)
  207. if let host = head.headers.first(name: "host") {
  208. headers.add(name: ":authority", value: host)
  209. }
  210. headers.add(contentsOf: normalized)
  211. // Check whether we're dealing with gRPC Web Text. No need to fully validate the content-type
  212. // that will be done at the HTTP/2 level.
  213. let contentType = headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init)
  214. let isWebText = contentType == .some(.webTextProtobuf)
  215. self = .open(.init(isTextEncoded: isWebText, allocator: allocator))
  216. return .fireChannelRead(.headers(.init(headers: headers)))
  217. case .open, .closed:
  218. preconditionFailure("Invalid state: already received request head")
  219. case ._modifying:
  220. preconditionFailure("Left in modifying state")
  221. }
  222. }
  223. private mutating func processRequestBody(
  224. _ buffer: inout ByteBuffer
  225. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  226. switch self {
  227. case .idle:
  228. preconditionFailure("Invalid state: haven't received request head")
  229. case var .open(state):
  230. assert(!state.requestEndSeen, "Invalid state: request stream closed")
  231. if state.requestBuffer == nil {
  232. // We're not dealing with gRPC Web Text: just forward the buffer.
  233. return .fireChannelRead(.data(.init(data: .byteBuffer(buffer))))
  234. }
  235. if state.requestBuffer!.readableBytes == 0 {
  236. state.requestBuffer = buffer
  237. } else {
  238. state.requestBuffer!.writeBuffer(&buffer)
  239. }
  240. let readableBytes = state.requestBuffer!.readableBytes
  241. // The length of base64 encoded data must be a multiple of 4.
  242. let bytesToRead = readableBytes - (readableBytes % 4)
  243. let action: GRPCWebToHTTP2ServerCodec.StateMachine.Action
  244. if bytesToRead > 0,
  245. let base64Encoded = state.requestBuffer!.readString(length: bytesToRead),
  246. let base64Decoded = Data(base64Encoded: base64Encoded) {
  247. // Recycle the input buffer and restore the request buffer.
  248. buffer.clear()
  249. buffer.writeContiguousBytes(base64Decoded)
  250. action = .fireChannelRead(.data(.init(data: .byteBuffer(buffer))))
  251. } else {
  252. action = .none
  253. }
  254. self = .open(state)
  255. return action
  256. case .closed:
  257. return .none
  258. case ._modifying:
  259. preconditionFailure("Left in modifying state")
  260. }
  261. }
  262. private mutating func processRequestEnd(
  263. allocator: ByteBufferAllocator
  264. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  265. switch self {
  266. case .idle:
  267. preconditionFailure("Invalid state: haven't received request head")
  268. case var .open(state):
  269. assert(!state.requestEndSeen, "Invalid state: already seen end stream ")
  270. state.requestEndSeen = true
  271. self = .open(state)
  272. // Send an empty DATA frame with the end stream flag set.
  273. let empty = allocator.buffer(capacity: 0)
  274. return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true)))
  275. case .closed:
  276. return .none
  277. case ._modifying:
  278. preconditionFailure("Left in modifying state")
  279. }
  280. }
  281. }
  282. // MARK: - Outbound
  283. extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
  284. private mutating func processResponseHeaders(
  285. _ payload: HTTP2Frame.FramePayload.Headers,
  286. promise: EventLoopPromise<Void>?,
  287. allocator: ByteBufferAllocator
  288. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  289. switch self {
  290. case .idle:
  291. preconditionFailure("Invalid state: haven't received request head")
  292. case var .open(state):
  293. let action: GRPCWebToHTTP2ServerCodec.StateMachine.Action
  294. if state.responseHeadersSent {
  295. // Headers have been sent, these must be trailers, so end stream must be set.
  296. assert(payload.endStream)
  297. if var responseBuffer = state.responseBuffer {
  298. // We have a response buffer; we're doing gRPC Web Text. Nil out the buffer to avoid CoWs.
  299. state.responseBuffer = nil
  300. let buffer = GRPCWebToHTTP2ServerCodec.encodeResponsesAndTrailers(
  301. &responseBuffer,
  302. trailers: payload.headers,
  303. allocator: allocator
  304. )
  305. self = .closed
  306. action = .write(.body(.byteBuffer(buffer)), .end(nil), promise)
  307. } else {
  308. // No response buffer; plain gRPC Web.
  309. let trailers = HTTPHeaders(hpackHeaders: payload.headers)
  310. self = .closed
  311. action = .write(.end(trailers), nil, promise)
  312. }
  313. } else if payload.endStream {
  314. // Headers haven't been sent yet and end stream is set: this is a trailers only response
  315. // so we need to send 'end' as well.
  316. let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(hpackHeaders: payload.headers)
  317. self = .closed
  318. action = .write(.head(head), .end(nil), promise)
  319. } else {
  320. // Headers haven't been sent, end stream isn't set. Just send response head.
  321. state.responseHeadersSent = true
  322. let head = GRPCWebToHTTP2ServerCodec.makeResponseHead(hpackHeaders: payload.headers)
  323. self = .open(state)
  324. action = .write(.head(head), nil, promise)
  325. }
  326. return action
  327. case .closed:
  328. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  329. case ._modifying:
  330. preconditionFailure("Left in modifying state")
  331. }
  332. }
  333. private mutating func processResponseData(
  334. _ payload: HTTP2Frame.FramePayload.Data,
  335. promise: EventLoopPromise<Void>?
  336. ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action {
  337. switch self {
  338. case .idle:
  339. preconditionFailure("Invalid state: haven't received request head")
  340. case var .open(state):
  341. if state.responseBuffer == nil {
  342. // Not gRPC Web Text; just write the body.
  343. return .write(.body(payload.data), nil, promise)
  344. } else {
  345. switch payload.data {
  346. case let .byteBuffer(buffer):
  347. // '!' is fine, we checked above.
  348. state.responseBuffer!.append(buffer)
  349. case .fileRegion:
  350. preconditionFailure("Unexpected IOData.fileRegion")
  351. }
  352. self = .open(state)
  353. // The response is buffered, we can consider it dealt with.
  354. return .completePromise(promise, .success(()))
  355. }
  356. case .closed:
  357. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  358. case ._modifying:
  359. preconditionFailure("Left in modifying state")
  360. }
  361. }
  362. }
  363. // MARK: - Helpers
  364. extension GRPCWebToHTTP2ServerCodec {
  365. private static func makeResponseHead(hpackHeaders: HPACKHeaders) -> HTTPResponseHead {
  366. let headers = HTTPHeaders(hpackHeaders: hpackHeaders)
  367. // Grab the status, if this is missing we've messed up in another handler.
  368. guard let statusCode = hpackHeaders.first(name: ":status").flatMap(Int.init) else {
  369. preconditionFailure("Invalid state: missing ':status' pseudo header")
  370. }
  371. return HTTPResponseHead(
  372. version: .init(major: 1, minor: 1),
  373. status: .init(statusCode: statusCode),
  374. headers: headers
  375. )
  376. }
  377. private static func formatTrailers(
  378. _ trailers: HPACKHeaders,
  379. allocator: ByteBufferAllocator
  380. ) -> ByteBuffer {
  381. // See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md
  382. let encodedTrailers = trailers.map { name, value, _ in
  383. "\(name): \(value)"
  384. }.joined(separator: "\r\n")
  385. var buffer = allocator.buffer(capacity: 5 + encodedTrailers.utf8.count)
  386. // Uncompressed trailer byte.
  387. buffer.writeInteger(UInt8(0x80))
  388. // Length.
  389. buffer.writeInteger(UInt32(encodedTrailers.utf8.count))
  390. // Uncompressed trailers.
  391. buffer.writeString(encodedTrailers)
  392. return buffer
  393. }
  394. private static func encodeResponsesAndTrailers(
  395. _ responses: inout CircularBuffer<ByteBuffer>,
  396. trailers: HPACKHeaders,
  397. allocator: ByteBufferAllocator
  398. ) -> ByteBuffer {
  399. // We need to encode the trailers along with any responses we're holding.
  400. responses.append(self.formatTrailers(trailers, allocator: allocator))
  401. let capacity = responses.lazy.map { $0.readableBytes }.reduce(0, +)
  402. // '!' is fine: responses isn't empty, we just appended the trailers.
  403. var buffer = responses.popFirst()!
  404. // Accumulate all the buffers into a single 'Data'. Ideally we wouldn't copy back and forth
  405. // but this is fine for now.
  406. var accumulatedData = buffer.readData(length: buffer.readableBytes)!
  407. accumulatedData.reserveCapacity(capacity)
  408. while let buffer = responses.popFirst() {
  409. accumulatedData.append(contentsOf: buffer.readableBytesView)
  410. }
  411. // We can reuse the popped buffer.
  412. let base64Encoded = accumulatedData.base64EncodedString()
  413. buffer.clear(minimumCapacity: base64Encoded.utf8.count)
  414. buffer.writeString(base64Encoded)
  415. return buffer
  416. }
  417. }
  418. extension HTTPHeaders {
  419. fileprivate init(hpackHeaders headers: HPACKHeaders) {
  420. self.init()
  421. self.reserveCapacity(headers.count)
  422. // Pseudo-headers are at the start of the block, so drop them and then add the remaining.
  423. let regularHeaders = headers.drop { name, _, _ in
  424. name.utf8.first == .some(UInt8(ascii: ":"))
  425. }.lazy.map { name, value, _ in
  426. (name, value)
  427. }
  428. self.add(contentsOf: regularHeaders)
  429. }
  430. }