_BaseCallHandler.swift 20 KB


  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOHPACK
  20. import SwiftProtobuf
  21. /// Provides a means for decoding incoming gRPC messages into protobuf objects.
  22. ///
  23. /// Calls through to `processMessage` for individual messages it receives, which needs to be implemented by subclasses.
  24. /// - Important: This is **NOT** part of the public API.
  25. public class _BaseCallHandler<Request, Response>: GRPCCallHandler, ChannelInboundHandler {
  26. public typealias InboundIn = _GRPCServerRequestPart<Request>
  27. public typealias OutboundOut = _GRPCServerResponsePart<Response>
  28. public let _codec: ChannelHandler
  29. /// An interceptor pipeline.
  30. private var pipeline: ServerInterceptorPipeline<Request, Response>?
  31. /// Our current state.
  32. private var state: State = .idle
  33. /// The type of this RPC, e.g. 'unary'.
  34. private let callType: GRPCCallType
  35. /// Some context provided to us from the routing handler.
  36. private let callHandlerContext: CallHandlerContext
  37. /// The event loop this call is being handled on.
  38. internal var eventLoop: EventLoop {
  39. return self.callHandlerContext.eventLoop
  40. }
  41. /// An error delegate.
  42. internal var errorDelegate: ServerErrorDelegate? {
  43. return self.callHandlerContext.errorDelegate
  44. }
  45. /// A logger.
  46. internal var logger: Logger {
  47. return self.callHandlerContext.logger
  48. }
  49. internal init(
  50. callHandlerContext: CallHandlerContext,
  51. codec: ChannelHandler,
  52. callType: GRPCCallType,
  53. interceptors: [ServerInterceptor<Request, Response>]
  54. ) {
  55. self.callHandlerContext = callHandlerContext
  56. self._codec = codec
  57. self.callType = callType
  58. self.pipeline = ServerInterceptorPipeline(
  59. logger: callHandlerContext.logger,
  60. eventLoop: callHandlerContext.eventLoop,
  61. path: callHandlerContext.path,
  62. callType: callType,
  63. interceptors: interceptors,
  64. onRequestPart: self.receiveRequestPartFromInterceptors(_:),
  65. onResponsePart: self.sendResponsePartFromInterceptors(_:promise:)
  66. )
  67. }
  68. // MARK: - ChannelHandler
  69. public func handlerAdded(context: ChannelHandlerContext) {
  70. self.act(on: self.state.handlerAdded(context: context))
  71. }
  72. public func handlerRemoved(context: ChannelHandlerContext) {
  73. self.pipeline = nil
  74. }
  75. public func channelInactive(context: ChannelHandlerContext) {
  76. self.pipeline = nil
  77. context.fireChannelInactive()
  78. }
  79. public func errorCaught(context: ChannelHandlerContext, error: Error) {
  80. self.act(on: self.state.errorCaught(error))
  81. }
  82. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  83. let part = self.unwrapInboundIn(data)
  84. self.act(on: self.state.channelRead(part))
  85. // We're the last handler. We don't have anything to forward.
  86. }
  87. // MARK: - Event Observer
  88. internal func observeHeaders(_ headers: HPACKHeaders) {
  89. fatalError("must be overridden by subclasses")
  90. }
  91. internal func observeRequest(_ message: Request) {
  92. fatalError("must be overridden by subclasses")
  93. }
  94. internal func observeEnd() {
  95. fatalError("must be overridden by subclasses")
  96. }
  97. internal func observeLibraryError(_ error: Error) {
  98. fatalError("must be overridden by subclasses")
  99. }
  100. /// Send a response part to the interceptor pipeline. Called by an event observer.
  101. /// - Parameters:
  102. /// - part: The response part to send.
  103. /// - promise: A promise to complete once the response part has been written.
  104. internal func sendResponsePartFromObserver(
  105. _ part: GRPCServerResponsePart<Response>,
  106. promise: EventLoopPromise<Void>?
  107. ) {
  108. self.act(on: self.state.sendResponsePartFromObserver(part, promise: promise))
  109. }
  110. /// Processes a library error to form a `GRPCStatus` and trailers to send back to the client.
  111. /// - Parameter error: The error to process.
  112. /// - Returns: The status and trailers to send to the client.
  113. internal func processLibraryError(_ error: Error) -> (GRPCStatus, HPACKHeaders) {
  114. // Observe the error if we have a delegate.
  115. self.errorDelegate?.observeLibraryError(error)
  116. // What status are we terminating this RPC with?
  117. // - If we have a delegate, try transforming the error. If the delegate returns trailers, merge
  118. // them with any on the call context.
  119. // - If we don't have a delegate, then try to transform the error to a status.
  120. // - Fallback to a generic error.
  121. let status: GRPCStatus
  122. let trailers: HPACKHeaders
  123. if let transformed = self.errorDelegate?.transformLibraryError(error) {
  124. status = transformed.status
  125. trailers = transformed.trailers ?? [:]
  126. } else if let grpcStatusTransformable = error as? GRPCStatusTransformable {
  127. status = grpcStatusTransformable.makeGRPCStatus()
  128. trailers = [:]
  129. } else {
  130. // Eh... well, we don't what status to use. Use a generic one.
  131. status = .processingError
  132. trailers = [:]
  133. }
  134. return (status, trailers)
  135. }
  136. /// Processes an error, transforming it into a 'GRPCStatus' and any trailers to send to the peer.
  137. internal func processObserverError(
  138. _ error: Error,
  139. headers: HPACKHeaders,
  140. trailers: HPACKHeaders
  141. ) -> (GRPCStatus, HPACKHeaders) {
  142. // Observe the error if we have a delegate.
  143. self.errorDelegate?.observeRequestHandlerError(error, headers: headers)
  144. // What status are we terminating this RPC with?
  145. // - If we have a delegate, try transforming the error. If the delegate returns trailers, merge
  146. // them with any on the call context.
  147. // - If we don't have a delegate, then try to transform the error to a status.
  148. // - Fallback to a generic error.
  149. let status: GRPCStatus
  150. let mergedTrailers: HPACKHeaders
  151. if let transformed = self.errorDelegate?.transformRequestHandlerError(error, headers: headers) {
  152. status = transformed.status
  153. if var transformedTrailers = transformed.trailers {
  154. // The delegate returned trailers: merge in those from the context as well.
  155. transformedTrailers.add(contentsOf: trailers)
  156. mergedTrailers = transformedTrailers
  157. } else {
  158. mergedTrailers = trailers
  159. }
  160. } else if let grpcStatusTransformable = error as? GRPCStatusTransformable {
  161. status = grpcStatusTransformable.makeGRPCStatus()
  162. mergedTrailers = trailers
  163. } else {
  164. // Eh... well, we don't what status to use. Use a generic one.
  165. status = .processingError
  166. mergedTrailers = trailers
  167. }
  168. return (status, mergedTrailers)
  169. }
  170. }
  171. // MARK: - Interceptor API
  172. extension _BaseCallHandler {
  173. /// Receive a request part from the interceptors pipeline to forward to the event observer.
  174. /// - Parameter part: The request part to forward.
  175. private func receiveRequestPartFromInterceptors(_ part: GRPCServerRequestPart<Request>) {
  176. self.act(on: self.state.receiveRequestPartFromInterceptors(part))
  177. }
  178. /// Send a response part via the `Channel`. Called once the response part has traversed the
  179. /// interceptor pipeline.
  180. /// - Parameters:
  181. /// - part: The response part to send.
  182. /// - promise: A promise to complete once the response part has been written.
  183. private func sendResponsePartFromInterceptors(
  184. _ part: GRPCServerResponsePart<Response>,
  185. promise: EventLoopPromise<Void>?
  186. ) {
  187. self.act(on: self.state.sendResponsePartFromInterceptors(part, promise: promise))
  188. }
  189. }
  190. // MARK: - State
  191. extension _BaseCallHandler {
  192. fileprivate enum State {
  193. /// Idle. We're waiting to be added to a pipeline.
  194. case idle
  195. /// We're in a pipeline and receiving from the client.
  196. case active(ActiveState)
  197. /// We're done. This state is terminal, all actions are ignored.
  198. case closed
  199. }
  200. }
  201. extension _BaseCallHandler.State {
  202. /// The state of the request and response streams.
  203. ///
  204. /// We track the stream state twice: between the 'Channel' and interceptor pipeline, and between
  205. /// the interceptor pipeline and event observer.
  206. fileprivate enum StreamState {
  207. case requestIdleResponseIdle
  208. case requestOpenResponseIdle
  209. case requestOpenResponseOpen
  210. case requestClosedResponseIdle
  211. case requestClosedResponseOpen
  212. case requestClosedResponseClosed
  213. enum Filter {
  214. case allow
  215. case drop
  216. }
  217. mutating func receiveHeaders() -> Filter {
  218. switch self {
  219. case .requestIdleResponseIdle:
  220. self = .requestOpenResponseIdle
  221. return .allow
  222. case .requestOpenResponseIdle,
  223. .requestOpenResponseOpen,
  224. .requestClosedResponseIdle,
  225. .requestClosedResponseOpen,
  226. .requestClosedResponseClosed:
  227. return .drop
  228. }
  229. }
  230. func receiveMessage() -> Filter {
  231. switch self {
  232. case .requestOpenResponseIdle,
  233. .requestOpenResponseOpen:
  234. return .allow
  235. case .requestIdleResponseIdle,
  236. .requestClosedResponseIdle,
  237. .requestClosedResponseOpen,
  238. .requestClosedResponseClosed:
  239. return .drop
  240. }
  241. }
  242. mutating func receiveEnd() -> Filter {
  243. switch self {
  244. case .requestOpenResponseIdle:
  245. self = .requestClosedResponseIdle
  246. return .allow
  247. case .requestOpenResponseOpen:
  248. self = .requestClosedResponseOpen
  249. return .allow
  250. case .requestIdleResponseIdle,
  251. .requestClosedResponseIdle,
  252. .requestClosedResponseOpen,
  253. .requestClosedResponseClosed:
  254. return .drop
  255. }
  256. }
  257. mutating func sendHeaders() -> Filter {
  258. switch self {
  259. case .requestOpenResponseIdle:
  260. self = .requestOpenResponseOpen
  261. return .allow
  262. case .requestClosedResponseIdle:
  263. self = .requestClosedResponseOpen
  264. return .allow
  265. case .requestIdleResponseIdle,
  266. .requestOpenResponseOpen,
  267. .requestClosedResponseOpen,
  268. .requestClosedResponseClosed:
  269. return .drop
  270. }
  271. }
  272. func sendMessage() -> Filter {
  273. switch self {
  274. case .requestOpenResponseOpen,
  275. .requestClosedResponseOpen:
  276. return .allow
  277. case .requestIdleResponseIdle,
  278. .requestOpenResponseIdle,
  279. .requestClosedResponseIdle,
  280. .requestClosedResponseClosed:
  281. return .drop
  282. }
  283. }
  284. mutating func sendEnd() -> Filter {
  285. switch self {
  286. case .requestIdleResponseIdle:
  287. return .drop
  288. case .requestOpenResponseIdle,
  289. .requestOpenResponseOpen,
  290. .requestClosedResponseIdle,
  291. .requestClosedResponseOpen:
  292. self = .requestClosedResponseClosed
  293. return .allow
  294. case .requestClosedResponseClosed:
  295. return .drop
  296. }
  297. }
  298. }
  299. fileprivate struct ActiveState {
  300. var context: ChannelHandlerContext
  301. /// The stream state between the 'Channel' and interceptor pipeline.
  302. var channelStreamState: StreamState
  303. /// The stream state between the interceptor pipeline and event observer.
  304. var observerStreamState: StreamState
  305. init(context: ChannelHandlerContext) {
  306. self.context = context
  307. self.channelStreamState = .requestIdleResponseIdle
  308. self.observerStreamState = .requestIdleResponseIdle
  309. }
  310. }
  311. }
  312. extension _BaseCallHandler.State {
  313. fileprivate enum Action {
  314. /// Do nothing.
  315. case none
  316. /// Receive the request part in the interceptor pipeline.
  317. case receiveRequestPartInInterceptors(GRPCServerRequestPart<Request>)
  318. /// Receive the request part in the observer.
  319. case receiveRequestPartInObserver(GRPCServerRequestPart<Request>)
  320. /// Receive an error in the observer.
  321. case receiveLibraryErrorInObserver(Error)
  322. /// Send a response part to the interceptor pipeline.
  323. case sendResponsePartToInterceptors(GRPCServerResponsePart<Response>, EventLoopPromise<Void>?)
  324. /// Write the response part to the `Channel`.
  325. case writeResponsePartToChannel(
  326. ChannelHandlerContext,
  327. GRPCServerResponsePart<Response>,
  328. promise: EventLoopPromise<Void>?
  329. )
  330. /// Complete the promise with the result.
  331. case completePromise(EventLoopPromise<Void>?, Result<Void, Error>)
  332. /// Perform multiple actions.
  333. indirect case multiple([Action])
  334. }
  335. }
  336. extension _BaseCallHandler.State {
  337. /// The handler was added to the `ChannelPipeline`: this is the only way to move from the `.idle`
  338. /// state. We only expect this to be called once.
  339. internal mutating func handlerAdded(context: ChannelHandlerContext) -> Action {
  340. switch self {
  341. case .idle:
  342. // This is the only way we can become active.
  343. self = .active(.init(context: context))
  344. return .none
  345. case .active:
  346. preconditionFailure("Invalid state: already active")
  347. case .closed:
  348. return .none
  349. }
  350. }
  351. /// Received an error from the `Channel`.
  352. internal mutating func errorCaught(_ error: Error) -> Action {
  353. switch self {
  354. case .active:
  355. return .receiveLibraryErrorInObserver(error)
  356. case .idle, .closed:
  357. return .none
  358. }
  359. }
  360. /// Receive a request part from the `Channel`. If we're active we just forward these through the
  361. /// pipeline. We validate at the other end.
  362. internal mutating func channelRead(_ requestPart: _GRPCServerRequestPart<Request>) -> Action {
  363. switch self {
  364. case .idle:
  365. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  366. case var .active(state):
  367. // Avoid CoW-ing state.
  368. self = .idle
  369. let filter: StreamState.Filter
  370. let part: GRPCServerRequestPart<Request>
  371. switch requestPart {
  372. case let .headers(headers):
  373. filter = state.channelStreamState.receiveHeaders()
  374. part = .metadata(headers)
  375. case let .message(message):
  376. filter = state.channelStreamState.receiveMessage()
  377. part = .message(message)
  378. case .end:
  379. filter = state.channelStreamState.receiveEnd()
  380. part = .end
  381. }
  382. // Restore state.
  383. self = .active(state)
  384. switch filter {
  385. case .allow:
  386. return .receiveRequestPartInInterceptors(part)
  387. case .drop:
  388. return .none
  389. }
  390. case .closed:
  391. return .none
  392. }
  393. }
  394. /// Send a response part from the observer to the interceptors.
  395. internal mutating func sendResponsePartFromObserver(
  396. _ part: GRPCServerResponsePart<Response>,
  397. promise: EventLoopPromise<Void>?
  398. ) -> Action {
  399. switch self {
  400. case .idle:
  401. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  402. case var .active(state):
  403. // Avoid CoW-ing 'state'.
  404. self = .idle
  405. let filter: StreamState.Filter
  406. switch part {
  407. case .metadata:
  408. filter = state.observerStreamState.sendHeaders()
  409. case .message:
  410. filter = state.observerStreamState.sendMessage()
  411. case .end:
  412. filter = state.observerStreamState.sendEnd()
  413. }
  414. // Restore the state.
  415. self = .active(state)
  416. switch filter {
  417. case .allow:
  418. return .sendResponsePartToInterceptors(part, promise)
  419. case .drop:
  420. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  421. }
  422. case .closed:
  423. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  424. }
  425. }
  426. /// Send a response part from the interceptors to the `Channel`.
  427. internal mutating func sendResponsePartFromInterceptors(
  428. _ part: GRPCServerResponsePart<Response>,
  429. promise: EventLoopPromise<Void>?
  430. ) -> Action {
  431. switch self {
  432. case .idle:
  433. preconditionFailure("Invalid state: can't send response on idle call")
  434. case var .active(state):
  435. // Avoid CoW-ing 'state'.
  436. self = .idle
  437. let filter: StreamState.Filter
  438. switch part {
  439. case .metadata:
  440. filter = state.channelStreamState.sendHeaders()
  441. self = .active(state)
  442. case .message:
  443. filter = state.channelStreamState.sendMessage()
  444. self = .active(state)
  445. case .end:
  446. filter = state.channelStreamState.sendEnd()
  447. // We're sending end, we're no longer active.
  448. self = .closed
  449. }
  450. switch filter {
  451. case .allow:
  452. return .writeResponsePartToChannel(state.context, part, promise: promise)
  453. case .drop:
  454. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  455. }
  456. case .closed:
  457. // We're already closed, fail any promise.
  458. return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))
  459. }
  460. }
  461. /// A request part has traversed the interceptor pipeline, now send it to the observer.
  462. internal mutating func receiveRequestPartFromInterceptors(
  463. _ part: GRPCServerRequestPart<Request>
  464. ) -> Action {
  465. switch self {
  466. case .idle:
  467. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  468. case var .active(state):
  469. // Avoid CoW-ing `state`.
  470. self = .idle
  471. let filter: StreamState.Filter
  472. // Does the active state allow us to send this?
  473. switch part {
  474. case .metadata:
  475. filter = state.observerStreamState.receiveHeaders()
  476. case .message:
  477. filter = state.observerStreamState.receiveMessage()
  478. case .end:
  479. filter = state.observerStreamState.receiveEnd()
  480. }
  481. // Put `state` back.
  482. self = .active(state)
  483. switch filter {
  484. case .allow:
  485. return .receiveRequestPartInObserver(part)
  486. case .drop:
  487. return .none
  488. }
  489. case .closed:
  490. // We're closed, just ignore this.
  491. return .none
  492. }
  493. }
  494. }
  495. // MARK: State Actions
  496. extension _BaseCallHandler {
  497. private func act(on action: State.Action) {
  498. switch action {
  499. case .none:
  500. ()
  501. case let .receiveRequestPartInInterceptors(part):
  502. self.receiveRequestPartInInterceptors(part)
  503. case let .receiveRequestPartInObserver(part):
  504. self.receiveRequestPartInObserver(part)
  505. case let .receiveLibraryErrorInObserver(error):
  506. self.observeLibraryError(error)
  507. case let .sendResponsePartToInterceptors(part, promise):
  508. self.sendResponsePartToInterceptors(part, promise: promise)
  509. case let .writeResponsePartToChannel(context, part, promise):
  510. self.writeResponsePartToChannel(context: context, part: part, promise: promise)
  511. case let .completePromise(promise, result):
  512. promise?.completeWith(result)
  513. case let .multiple(actions):
  514. for action in actions {
  515. self.act(on: action)
  516. }
  517. }
  518. }
  519. /// Receives a request part in the interceptor pipeline.
  520. private func receiveRequestPartInInterceptors(_ part: GRPCServerRequestPart<Request>) {
  521. self.pipeline?.receive(part)
  522. }
  523. /// Observe a request part. This just farms out to the subclass implementation for the
  524. /// appropriate part.
  525. private func receiveRequestPartInObserver(_ part: GRPCServerRequestPart<Request>) {
  526. switch part {
  527. case let .metadata(headers):
  528. self.observeHeaders(headers)
  529. case let .message(request):
  530. self.observeRequest(request)
  531. case .end:
  532. self.observeEnd()
  533. }
  534. }
  535. /// Sends a response part into the interceptor pipeline.
  536. private func sendResponsePartToInterceptors(
  537. _ part: GRPCServerResponsePart<Response>,
  538. promise: EventLoopPromise<Void>?
  539. ) {
  540. if let pipeline = self.pipeline {
  541. pipeline.send(part, promise: promise)
  542. } else {
  543. promise?.fail(GRPCError.AlreadyComplete())
  544. }
  545. }
  546. /// Writes a response part to the `Channel`.
  547. private func writeResponsePartToChannel(
  548. context: ChannelHandlerContext,
  549. part: GRPCServerResponsePart<Response>,
  550. promise: EventLoopPromise<Void>?
  551. ) {
  552. let flush: Bool
  553. switch part {
  554. case let .metadata(headers):
  555. // Only flush if we're streaming responses, if we're not streaming responses then we'll wait
  556. // for the response and end before emitting the flush.
  557. flush = self.callType.isStreamingResponses
  558. context.write(self.wrapOutboundOut(.headers(headers)), promise: promise)
  559. case let .message(message, metadata):
  560. context.write(
  561. self.wrapOutboundOut(.message(.init(message, compressed: metadata.compress))),
  562. promise: promise
  563. )
  564. // Flush if we've been told to flush.
  565. flush = metadata.flush
  566. case let .end(status, trailers):
  567. context.write(self.wrapOutboundOut(.statusAndTrailers(status, trailers)), promise: promise)
  568. // Always flush on end.
  569. flush = true
  570. }
  571. if flush {
  572. context.flush()
  573. }
  574. }
  575. }