_BaseCallHandler.swift 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOHPACK
  20. import SwiftProtobuf
  21. /// Provides a means for decoding incoming gRPC messages into protobuf objects.
  22. ///
  23. /// Calls through to `processMessage` for individual messages it receives, which needs to be implemented by subclasses.
  24. /// - Important: This is **NOT** part of the public API.
  25. public class _BaseCallHandler<Request, Response>: GRPCCallHandler, ChannelInboundHandler {
  26. public typealias InboundIn = _GRPCServerRequestPart<Request>
  27. public typealias OutboundOut = _GRPCServerResponsePart<Response>
  28. public let _codec: ChannelHandler
  29. /// An interceptor pipeline.
  30. private var pipeline: ServerInterceptorPipeline<Request, Response>?
  31. /// Our current state.
  32. private var state: State = .idle
  33. /// The type of this RPC, e.g. 'unary'.
  34. private let callType: GRPCCallType
  35. /// Some context provided to us from the routing handler.
  36. private let callHandlerContext: CallHandlerContext
  37. /// The event loop this call is being handled on.
  38. internal var eventLoop: EventLoop {
  39. return self.callHandlerContext.eventLoop
  40. }
  41. /// An error delegate.
  42. internal var errorDelegate: ServerErrorDelegate? {
  43. return self.callHandlerContext.errorDelegate
  44. }
  45. /// A logger.
  46. internal var logger: Logger {
  47. return self.callHandlerContext.logger
  48. }
  49. /// A reference to `UserInfo`.
  50. internal var userInfoRef: Ref<UserInfo>
  51. internal init(
  52. callHandlerContext: CallHandlerContext,
  53. codec: ChannelHandler,
  54. callType: GRPCCallType,
  55. interceptors: [ServerInterceptor<Request, Response>]
  56. ) {
  57. let userInfoRef = Ref(UserInfo())
  58. self.callHandlerContext = callHandlerContext
  59. self._codec = codec
  60. self.callType = callType
  61. self.userInfoRef = userInfoRef
  62. self.pipeline = ServerInterceptorPipeline(
  63. logger: callHandlerContext.logger,
  64. eventLoop: callHandlerContext.eventLoop,
  65. path: callHandlerContext.path,
  66. callType: callType,
  67. userInfoRef: userInfoRef,
  68. interceptors: interceptors,
  69. onRequestPart: self.receiveRequestPartFromInterceptors(_:),
  70. onResponsePart: self.sendResponsePartFromInterceptors(_:promise:)
  71. )
  72. }
  73. // MARK: - ChannelHandler
  74. public func handlerAdded(context: ChannelHandlerContext) {
  75. self.act(on: self.state.handlerAdded(context: context))
  76. }
  77. public func handlerRemoved(context: ChannelHandlerContext) {
  78. self.pipeline = nil
  79. }
  80. public func channelInactive(context: ChannelHandlerContext) {
  81. self.pipeline = nil
  82. context.fireChannelInactive()
  83. }
  84. public func errorCaught(context: ChannelHandlerContext, error: Error) {
  85. self.act(on: self.state.errorCaught(error))
  86. }
  87. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  88. let part = self.unwrapInboundIn(data)
  89. self.act(on: self.state.channelRead(part))
  90. // We're the last handler. We don't have anything to forward.
  91. }
  92. // MARK: - Event Observer
  93. internal func observeHeaders(_ headers: HPACKHeaders) {
  94. fatalError("must be overridden by subclasses")
  95. }
  96. internal func observeRequest(_ message: Request) {
  97. fatalError("must be overridden by subclasses")
  98. }
  99. internal func observeEnd() {
  100. fatalError("must be overridden by subclasses")
  101. }
  102. internal func observeLibraryError(_ error: Error) {
  103. fatalError("must be overridden by subclasses")
  104. }
  105. /// Send a response part to the interceptor pipeline. Called by an event observer.
  106. /// - Parameters:
  107. /// - part: The response part to send.
  108. /// - promise: A promise to complete once the response part has been written.
  109. internal func sendResponsePartFromObserver(
  110. _ part: GRPCServerResponsePart<Response>,
  111. promise: EventLoopPromise<Void>?
  112. ) {
  113. self.act(on: self.state.sendResponsePartFromObserver(part, promise: promise))
  114. }
  115. /// Processes a library error to form a `GRPCStatus` and trailers to send back to the client.
  116. /// - Parameter error: The error to process.
  117. /// - Returns: The status and trailers to send to the client.
  118. internal func processLibraryError(_ error: Error) -> (GRPCStatus, HPACKHeaders) {
  119. // Observe the error if we have a delegate.
  120. self.errorDelegate?.observeLibraryError(error)
  121. // What status are we terminating this RPC with?
  122. // - If we have a delegate, try transforming the error. If the delegate returns trailers, merge
  123. // them with any on the call context.
  124. // - If we don't have a delegate, then try to transform the error to a status.
  125. // - Fallback to a generic error.
  126. let status: GRPCStatus
  127. let trailers: HPACKHeaders
  128. if let transformed = self.errorDelegate?.transformLibraryError(error) {
  129. status = transformed.status
  130. trailers = transformed.trailers ?? [:]
  131. } else if let grpcStatusTransformable = error as? GRPCStatusTransformable {
  132. status = grpcStatusTransformable.makeGRPCStatus()
  133. trailers = [:]
  134. } else {
  135. // Eh... well, we don't what status to use. Use a generic one.
  136. status = .processingError
  137. trailers = [:]
  138. }
  139. return (status, trailers)
  140. }
  141. /// Processes an error, transforming it into a 'GRPCStatus' and any trailers to send to the peer.
  142. internal func processObserverError(
  143. _ error: Error,
  144. headers: HPACKHeaders,
  145. trailers: HPACKHeaders
  146. ) -> (GRPCStatus, HPACKHeaders) {
  147. // Observe the error if we have a delegate.
  148. self.errorDelegate?.observeRequestHandlerError(error, headers: headers)
  149. // What status are we terminating this RPC with?
  150. // - If we have a delegate, try transforming the error. If the delegate returns trailers, merge
  151. // them with any on the call context.
  152. // - If we don't have a delegate, then try to transform the error to a status.
  153. // - Fallback to a generic error.
  154. let status: GRPCStatus
  155. let mergedTrailers: HPACKHeaders
  156. if let transformed = self.errorDelegate?.transformRequestHandlerError(error, headers: headers) {
  157. status = transformed.status
  158. if var transformedTrailers = transformed.trailers {
  159. // The delegate returned trailers: merge in those from the context as well.
  160. transformedTrailers.add(contentsOf: trailers)
  161. mergedTrailers = transformedTrailers
  162. } else {
  163. mergedTrailers = trailers
  164. }
  165. } else if let grpcStatusTransformable = error as? GRPCStatusTransformable {
  166. status = grpcStatusTransformable.makeGRPCStatus()
  167. mergedTrailers = trailers
  168. } else {
  169. // Eh... well, we don't what status to use. Use a generic one.
  170. status = .processingError
  171. mergedTrailers = trailers
  172. }
  173. return (status, mergedTrailers)
  174. }
  175. }
  176. // MARK: - Interceptor API
  177. extension _BaseCallHandler {
  178. /// Receive a request part from the interceptors pipeline to forward to the event observer.
  179. /// - Parameter part: The request part to forward.
  180. private func receiveRequestPartFromInterceptors(_ part: GRPCServerRequestPart<Request>) {
  181. self.act(on: self.state.receiveRequestPartFromInterceptors(part))
  182. }
  183. /// Send a response part via the `Channel`. Called once the response part has traversed the
  184. /// interceptor pipeline.
  185. /// - Parameters:
  186. /// - part: The response part to send.
  187. /// - promise: A promise to complete once the response part has been written.
  188. private func sendResponsePartFromInterceptors(
  189. _ part: GRPCServerResponsePart<Response>,
  190. promise: EventLoopPromise<Void>?
  191. ) {
  192. self.act(on: self.state.sendResponsePartFromInterceptors(part, promise: promise))
  193. }
  194. }
  195. // MARK: - State
  196. extension _BaseCallHandler {
  197. fileprivate enum State {
  198. /// Idle. We're waiting to be added to a pipeline.
  199. case idle
  200. /// We're in a pipeline and receiving from the client.
  201. case active(ActiveState)
  202. /// We're done. This state is terminal, all actions are ignored.
  203. case closed
  204. }
  205. }
  206. extension _BaseCallHandler.State {
  207. /// The state of the request and response streams.
  208. ///
  209. /// We track the stream state twice: between the 'Channel' and interceptor pipeline, and between
  210. /// the interceptor pipeline and event observer.
  211. fileprivate enum StreamState {
  212. case requestIdleResponseIdle
  213. case requestOpenResponseIdle
  214. case requestOpenResponseOpen
  215. case requestClosedResponseIdle
  216. case requestClosedResponseOpen
  217. case requestClosedResponseClosed
  218. enum Filter {
  219. case allow
  220. case drop
  221. }
  222. mutating func receiveHeaders() -> Filter {
  223. switch self {
  224. case .requestIdleResponseIdle:
  225. self = .requestOpenResponseIdle
  226. return .allow
  227. case .requestOpenResponseIdle,
  228. .requestOpenResponseOpen,
  229. .requestClosedResponseIdle,
  230. .requestClosedResponseOpen,
  231. .requestClosedResponseClosed:
  232. return .drop
  233. }
  234. }
  235. func receiveMessage() -> Filter {
  236. switch self {
  237. case .requestOpenResponseIdle,
  238. .requestOpenResponseOpen:
  239. return .allow
  240. case .requestIdleResponseIdle,
  241. .requestClosedResponseIdle,
  242. .requestClosedResponseOpen,
  243. .requestClosedResponseClosed:
  244. return .drop
  245. }
  246. }
  247. mutating func receiveEnd() -> Filter {
  248. switch self {
  249. case .requestOpenResponseIdle:
  250. self = .requestClosedResponseIdle
  251. return .allow
  252. case .requestOpenResponseOpen:
  253. self = .requestClosedResponseOpen
  254. return .allow
  255. case .requestIdleResponseIdle,
  256. .requestClosedResponseIdle,
  257. .requestClosedResponseOpen,
  258. .requestClosedResponseClosed:
  259. return .drop
  260. }
  261. }
  262. mutating func sendHeaders() -> Filter {
  263. switch self {
  264. case .requestOpenResponseIdle:
  265. self = .requestOpenResponseOpen
  266. return .allow
  267. case .requestClosedResponseIdle:
  268. self = .requestClosedResponseOpen
  269. return .allow
  270. case .requestIdleResponseIdle,
  271. .requestOpenResponseOpen,
  272. .requestClosedResponseOpen,
  273. .requestClosedResponseClosed:
  274. return .drop
  275. }
  276. }
  277. func sendMessage() -> Filter {
  278. switch self {
  279. case .requestOpenResponseOpen,
  280. .requestClosedResponseOpen:
  281. return .allow
  282. case .requestIdleResponseIdle,
  283. .requestOpenResponseIdle,
  284. .requestClosedResponseIdle,
  285. .requestClosedResponseClosed:
  286. return .drop
  287. }
  288. }
  289. mutating func sendEnd() -> Filter {
  290. switch self {
  291. case .requestIdleResponseIdle:
  292. return .drop
  293. case .requestOpenResponseIdle,
  294. .requestOpenResponseOpen,
  295. .requestClosedResponseIdle,
  296. .requestClosedResponseOpen:
  297. self = .requestClosedResponseClosed
  298. return .allow
  299. case .requestClosedResponseClosed:
  300. return .drop
  301. }
  302. }
  303. }
  304. fileprivate struct ActiveState {
  305. var context: ChannelHandlerContext
  306. /// The stream state between the 'Channel' and interceptor pipeline.
  307. var channelStreamState: StreamState
  308. /// The stream state between the interceptor pipeline and event observer.
  309. var observerStreamState: StreamState
  310. init(context: ChannelHandlerContext) {
  311. self.context = context
  312. self.channelStreamState = .requestIdleResponseIdle
  313. self.observerStreamState = .requestIdleResponseIdle
  314. }
  315. }
  316. }
  317. extension _BaseCallHandler.State {
  318. fileprivate enum Action {
  319. /// Do nothing.
  320. case none
  321. /// Receive the request part in the interceptor pipeline.
  322. case receiveRequestPartInInterceptors(GRPCServerRequestPart<Request>)
  323. /// Receive the request part in the observer.
  324. case receiveRequestPartInObserver(GRPCServerRequestPart<Request>)
  325. /// Receive an error in the observer.
  326. case receiveLibraryErrorInObserver(Error)
  327. /// Send a response part to the interceptor pipeline.
  328. case sendResponsePartToInterceptors(GRPCServerResponsePart<Response>, EventLoopPromise<Void>?)
  329. /// Write the response part to the `Channel`.
  330. case writeResponsePartToChannel(
  331. ChannelHandlerContext,
  332. GRPCServerResponsePart<Response>,
  333. promise: EventLoopPromise<Void>?
  334. )
  335. /// Complete the promise with the result.
  336. case completePromise(EventLoopPromise<Void>?, Result<Void, Error>)
  337. /// Perform multiple actions.
  338. indirect case multiple([Action])
  339. }
  340. }
  341. extension _BaseCallHandler.State {
  342. /// The handler was added to the `ChannelPipeline`: this is the only way to move from the `.idle`
  343. /// state. We only expect this to be called once.
  344. internal mutating func handlerAdded(context: ChannelHandlerContext) -> Action {
  345. switch self {
  346. case .idle:
  347. // This is the only way we can become active.
  348. self = .active(.init(context: context))
  349. return .none
  350. case .active:
  351. preconditionFailure("Invalid state: already active")
  352. case .closed:
  353. return .none
  354. }
  355. }
  356. /// Received an error from the `Channel`.
  357. internal mutating func errorCaught(_ error: Error) -> Action {
  358. switch self {
  359. case .active:
  360. return .receiveLibraryErrorInObserver(error)
  361. case .idle, .closed:
  362. return .none
  363. }
  364. }
  365. /// Receive a request part from the `Channel`. If we're active we just forward these through the
  366. /// pipeline. We validate at the other end.
  367. internal mutating func channelRead(_ requestPart: _GRPCServerRequestPart<Request>) -> Action {
  368. switch self {
  369. case .idle:
  370. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  371. case var .active(state):
  372. // Avoid CoW-ing state.
  373. self = .idle
  374. let filter: StreamState.Filter
  375. let part: GRPCServerRequestPart<Request>
  376. switch requestPart {
  377. case let .headers(headers):
  378. filter = state.channelStreamState.receiveHeaders()
  379. part = .metadata(headers)
  380. case let .message(message):
  381. filter = state.channelStreamState.receiveMessage()
  382. part = .message(message)
  383. case .end:
  384. filter = state.channelStreamState.receiveEnd()
  385. part = .end
  386. }
  387. // Restore state.
  388. self = .active(state)
  389. switch filter {
  390. case .allow:
  391. return .receiveRequestPartInInterceptors(part)
  392. case .drop:
  393. return .none
  394. }
  395. case .closed:
  396. return .none
  397. }
  398. }
  399. /// Send a response part from the observer to the interceptors.
  400. internal mutating func sendResponsePartFromObserver(
  401. _ part: GRPCServerResponsePart<Response>,
  402. promise: EventLoopPromise<Void>?
  403. ) -> Action {
  404. switch self {
  405. case .idle:
  406. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  407. case var .active(state):
  408. // Avoid CoW-ing 'state'.
  409. self = .idle
  410. let filter: StreamState.Filter
  411. switch part {
  412. case .metadata:
  413. filter = state.observerStreamState.sendHeaders()
  414. case .message:
  415. filter = state.observerStreamState.sendMessage()
  416. case .end:
  417. filter = state.observerStreamState.sendEnd()
  418. }
  419. // Restore the state.
  420. self = .active(state)
  421. switch filter {
  422. case .allow:
  423. return .sendResponsePartToInterceptors(part, promise)
  424. case .drop:
  425. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  426. }
  427. case .closed:
  428. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  429. }
  430. }
  431. /// Send a response part from the interceptors to the `Channel`.
  432. internal mutating func sendResponsePartFromInterceptors(
  433. _ part: GRPCServerResponsePart<Response>,
  434. promise: EventLoopPromise<Void>?
  435. ) -> Action {
  436. switch self {
  437. case .idle:
  438. preconditionFailure("Invalid state: can't send response on idle call")
  439. case var .active(state):
  440. // Avoid CoW-ing 'state'.
  441. self = .idle
  442. let filter: StreamState.Filter
  443. switch part {
  444. case .metadata:
  445. filter = state.channelStreamState.sendHeaders()
  446. self = .active(state)
  447. case .message:
  448. filter = state.channelStreamState.sendMessage()
  449. self = .active(state)
  450. case .end:
  451. filter = state.channelStreamState.sendEnd()
  452. // We're sending end, we're no longer active.
  453. self = .closed
  454. }
  455. switch filter {
  456. case .allow:
  457. return .writeResponsePartToChannel(state.context, part, promise: promise)
  458. case .drop:
  459. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  460. }
  461. case .closed:
  462. // We're already closed, fail any promise.
  463. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  464. }
  465. }
  466. /// A request part has traversed the interceptor pipeline, now send it to the observer.
  467. internal mutating func receiveRequestPartFromInterceptors(
  468. _ part: GRPCServerRequestPart<Request>
  469. ) -> Action {
  470. switch self {
  471. case .idle:
  472. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  473. case var .active(state):
  474. // Avoid CoW-ing `state`.
  475. self = .idle
  476. let filter: StreamState.Filter
  477. // Does the active state allow us to send this?
  478. switch part {
  479. case .metadata:
  480. filter = state.observerStreamState.receiveHeaders()
  481. case .message:
  482. filter = state.observerStreamState.receiveMessage()
  483. case .end:
  484. filter = state.observerStreamState.receiveEnd()
  485. }
  486. // Put `state` back.
  487. self = .active(state)
  488. switch filter {
  489. case .allow:
  490. return .receiveRequestPartInObserver(part)
  491. case .drop:
  492. return .none
  493. }
  494. case .closed:
  495. // We're closed, just ignore this.
  496. return .none
  497. }
  498. }
  499. }
  500. // MARK: State Actions
  501. extension _BaseCallHandler {
  502. private func act(on action: State.Action) {
  503. switch action {
  504. case .none:
  505. ()
  506. case let .receiveRequestPartInInterceptors(part):
  507. self.receiveRequestPartInInterceptors(part)
  508. case let .receiveRequestPartInObserver(part):
  509. self.receiveRequestPartInObserver(part)
  510. case let .receiveLibraryErrorInObserver(error):
  511. self.observeLibraryError(error)
  512. case let .sendResponsePartToInterceptors(part, promise):
  513. self.sendResponsePartToInterceptors(part, promise: promise)
  514. case let .writeResponsePartToChannel(context, part, promise):
  515. self.writeResponsePartToChannel(context: context, part: part, promise: promise)
  516. case let .completePromise(promise, result):
  517. promise?.completeWith(result)
  518. case let .multiple(actions):
  519. for action in actions {
  520. self.act(on: action)
  521. }
  522. }
  523. }
  524. /// Receives a request part in the interceptor pipeline.
  525. private func receiveRequestPartInInterceptors(_ part: GRPCServerRequestPart<Request>) {
  526. self.pipeline?.receive(part)
  527. }
  528. /// Observe a request part. This just farms out to the subclass implementation for the
  529. /// appropriate part.
  530. private func receiveRequestPartInObserver(_ part: GRPCServerRequestPart<Request>) {
  531. switch part {
  532. case let .metadata(headers):
  533. self.observeHeaders(headers)
  534. case let .message(request):
  535. self.observeRequest(request)
  536. case .end:
  537. self.observeEnd()
  538. }
  539. }
  540. /// Sends a response part into the interceptor pipeline.
  541. private func sendResponsePartToInterceptors(
  542. _ part: GRPCServerResponsePart<Response>,
  543. promise: EventLoopPromise<Void>?
  544. ) {
  545. if let pipeline = self.pipeline {
  546. pipeline.send(part, promise: promise)
  547. } else {
  548. promise?.fail(GRPCError.AlreadyComplete())
  549. }
  550. }
  551. /// Writes a response part to the `Channel`.
  552. private func writeResponsePartToChannel(
  553. context: ChannelHandlerContext,
  554. part: GRPCServerResponsePart<Response>,
  555. promise: EventLoopPromise<Void>?
  556. ) {
  557. let flush: Bool
  558. switch part {
  559. case let .metadata(headers):
  560. // Only flush if we're streaming responses, if we're not streaming responses then we'll wait
  561. // for the response and end before emitting the flush.
  562. flush = self.callType.isStreamingResponses
  563. context.write(self.wrapOutboundOut(.headers(headers)), promise: promise)
  564. case let .message(message, metadata):
  565. context.write(
  566. self.wrapOutboundOut(.message(.init(message, compressed: metadata.compress))),
  567. promise: promise
  568. )
  569. // Flush if we've been told to flush.
  570. flush = metadata.flush
  571. case let .end(status, trailers):
  572. context.write(self.wrapOutboundOut(.statusAndTrailers(status, trailers)), promise: promise)
  573. // Always flush on end.
  574. flush = true
  575. }
  576. if flush {
  577. context.flush()
  578. }
  579. }
  580. }