_BaseCallHandler.swift 21 KB


  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOHPACK
  20. import SwiftProtobuf
  21. /// Provides a means for decoding incoming gRPC messages into protobuf objects.
  22. ///
  23. /// Calls through to `processMessage` for individual messages it receives, which needs to be implemented by subclasses.
  24. /// - Important: This is **NOT** part of the public API.
  25. public class _BaseCallHandler<
  26. RequestDeserializer: MessageDeserializer,
  27. ResponseSerializer: MessageSerializer
  28. >: GRPCCallHandler, ChannelInboundHandler {
  29. public typealias RequestPayload = RequestDeserializer.Output
  30. public typealias ResponsePayload = ResponseSerializer.Input
  31. public typealias InboundIn = GRPCServerRequestPart<ByteBuffer>
  32. public typealias OutboundOut = GRPCServerResponsePart<ByteBuffer>
  33. /// An interceptor pipeline.
  34. private var pipeline: ServerInterceptorPipeline<RequestPayload, ResponsePayload>?
  35. /// Our current state.
  36. private var state: State = .idle
  37. /// The type of this RPC, e.g. 'unary'.
  38. private let callType: GRPCCallType
  39. /// Some context provided to us from the routing handler.
  40. private let callHandlerContext: CallHandlerContext
  41. /// A request deserializer.
  42. private let requestDeserializer: RequestDeserializer
  43. /// A response serializer.
  44. private let responseSerializer: ResponseSerializer
  45. /// The `ChannelHandlerContext`.
  46. private var context: ChannelHandlerContext?
  47. /// The event loop this call is being handled on.
  48. internal var eventLoop: EventLoop {
  49. return self.callHandlerContext.eventLoop
  50. }
  51. /// An error delegate.
  52. internal var errorDelegate: ServerErrorDelegate? {
  53. return self.callHandlerContext.errorDelegate
  54. }
  55. /// A logger.
  56. internal var logger: Logger {
  57. return self.callHandlerContext.logger
  58. }
  59. /// A reference to `UserInfo`.
  60. internal var userInfoRef: Ref<UserInfo>
  61. internal init(
  62. callHandlerContext: CallHandlerContext,
  63. requestDeserializer: RequestDeserializer,
  64. responseSerializer: ResponseSerializer,
  65. callType: GRPCCallType,
  66. interceptors: [ServerInterceptor<RequestPayload, ResponsePayload>]
  67. ) {
  68. let userInfoRef = Ref(UserInfo())
  69. self.requestDeserializer = requestDeserializer
  70. self.responseSerializer = responseSerializer
  71. self.callHandlerContext = callHandlerContext
  72. self.callType = callType
  73. self.userInfoRef = userInfoRef
  74. self.pipeline = ServerInterceptorPipeline(
  75. logger: callHandlerContext.logger,
  76. eventLoop: callHandlerContext.eventLoop,
  77. path: callHandlerContext.path,
  78. callType: callType,
  79. userInfoRef: userInfoRef,
  80. interceptors: interceptors,
  81. onRequestPart: self.receiveRequestPartFromInterceptors(_:),
  82. onResponsePart: self.sendResponsePartFromInterceptors(_:promise:)
  83. )
  84. }
  85. // MARK: - ChannelHandler
  86. public func handlerAdded(context: ChannelHandlerContext) {
  87. self.state.handlerAdded()
  88. self.context = context
  89. }
  90. public func handlerRemoved(context: ChannelHandlerContext) {
  91. self.pipeline = nil
  92. self.context = nil
  93. }
  94. public func channelInactive(context: ChannelHandlerContext) {
  95. self.pipeline = nil
  96. context.fireChannelInactive()
  97. }
  98. public func errorCaught(context: ChannelHandlerContext, error: Error) {
  99. if self.state.errorCaught() {
  100. self.observeLibraryError(error)
  101. }
  102. }
  103. public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
  104. let part = self.unwrapInboundIn(data)
  105. switch part {
  106. case let .metadata(headers):
  107. if self.state.channelReadMetadata() {
  108. self.receiveRequestPartInInterceptors(.metadata(headers))
  109. }
  110. case let .message(buffer):
  111. if self.state.channelReadMessage() {
  112. do {
  113. let request = try self.requestDeserializer.deserialize(byteBuffer: buffer)
  114. self.receiveRequestPartInInterceptors(.message(request))
  115. } catch {
  116. self.errorCaught(context: context, error: error)
  117. }
  118. }
  119. case .end:
  120. if self.state.channelReadEnd() {
  121. self.receiveRequestPartInInterceptors(.end)
  122. }
  123. }
  124. // We're the last handler. We don't have anything to forward.
  125. }
  126. // MARK: - Event Observer
  127. internal func observeHeaders(_ headers: HPACKHeaders) {
  128. fatalError("must be overridden by subclasses")
  129. }
  130. internal func observeRequest(_ message: RequestPayload) {
  131. fatalError("must be overridden by subclasses")
  132. }
  133. internal func observeEnd() {
  134. fatalError("must be overridden by subclasses")
  135. }
  136. internal func observeLibraryError(_ error: Error) {
  137. fatalError("must be overridden by subclasses")
  138. }
  139. /// Send a response part to the interceptor pipeline. Called by an event observer.
  140. /// - Parameters:
  141. /// - part: The response part to send.
  142. /// - promise: A promise to complete once the response part has been written.
  143. internal func sendResponsePartFromObserver(
  144. _ part: GRPCServerResponsePart<ResponsePayload>,
  145. promise: EventLoopPromise<Void>?
  146. ) {
  147. let forward: Bool
  148. switch part {
  149. case .metadata:
  150. forward = self.state.sendResponsePartFromObserver(.metadata)
  151. case .message:
  152. forward = self.state.sendResponsePartFromObserver(.message)
  153. case .end:
  154. forward = self.state.sendResponsePartFromObserver(.end)
  155. }
  156. if forward {
  157. self.sendResponsePartToInterceptors(part, promise: promise)
  158. } else {
  159. promise?.fail(GRPCError.AlreadyComplete())
  160. }
  161. }
  162. /// Processes a library error to form a `GRPCStatus` and trailers to send back to the client.
  163. /// - Parameter error: The error to process.
  164. /// - Returns: The status and trailers to send to the client.
  165. internal func processLibraryError(_ error: Error) -> (GRPCStatus, HPACKHeaders) {
  166. // Observe the error if we have a delegate.
  167. self.errorDelegate?.observeLibraryError(error)
  168. // What status are we terminating this RPC with?
  169. // - If we have a delegate, try transforming the error. If the delegate returns trailers, merge
  170. // them with any on the call context.
  171. // - If we don't have a delegate, then try to transform the error to a status.
  172. // - Fallback to a generic error.
  173. let status: GRPCStatus
  174. let trailers: HPACKHeaders
  175. if let transformed = self.errorDelegate?.transformLibraryError(error) {
  176. status = transformed.status
  177. trailers = transformed.trailers ?? [:]
  178. } else if let grpcStatusTransformable = error as? GRPCStatusTransformable {
  179. status = grpcStatusTransformable.makeGRPCStatus()
  180. trailers = [:]
  181. } else {
  182. // Eh... well, we don't what status to use. Use a generic one.
  183. status = .processingError
  184. trailers = [:]
  185. }
  186. return (status, trailers)
  187. }
  188. /// Processes an error, transforming it into a 'GRPCStatus' and any trailers to send to the peer.
  189. internal func processObserverError(
  190. _ error: Error,
  191. headers: HPACKHeaders,
  192. trailers: HPACKHeaders
  193. ) -> (GRPCStatus, HPACKHeaders) {
  194. // Observe the error if we have a delegate.
  195. self.errorDelegate?.observeRequestHandlerError(error, headers: headers)
  196. // What status are we terminating this RPC with?
  197. // - If we have a delegate, try transforming the error. If the delegate returns trailers, merge
  198. // them with any on the call context.
  199. // - If we don't have a delegate, then try to transform the error to a status.
  200. // - Fallback to a generic error.
  201. let status: GRPCStatus
  202. let mergedTrailers: HPACKHeaders
  203. if let transformed = self.errorDelegate?.transformRequestHandlerError(error, headers: headers) {
  204. status = transformed.status
  205. if var transformedTrailers = transformed.trailers {
  206. // The delegate returned trailers: merge in those from the context as well.
  207. transformedTrailers.add(contentsOf: trailers)
  208. mergedTrailers = transformedTrailers
  209. } else {
  210. mergedTrailers = trailers
  211. }
  212. } else if let grpcStatusTransformable = error as? GRPCStatusTransformable {
  213. status = grpcStatusTransformable.makeGRPCStatus()
  214. mergedTrailers = trailers
  215. } else {
  216. // Eh... well, we don't what status to use. Use a generic one.
  217. status = .processingError
  218. mergedTrailers = trailers
  219. }
  220. return (status, mergedTrailers)
  221. }
  222. }
  223. // MARK: - Interceptor API
  224. extension _BaseCallHandler {
  225. /// Receive a request part from the interceptors pipeline to forward to the event observer.
  226. /// - Parameter part: The request part to forward.
  227. private func receiveRequestPartFromInterceptors(_ part: GRPCServerRequestPart<RequestPayload>) {
  228. let forward: Bool
  229. switch part {
  230. case .metadata:
  231. forward = self.state.receiveRequestPartFromInterceptors(.metadata)
  232. case .message:
  233. forward = self.state.receiveRequestPartFromInterceptors(.message)
  234. case .end:
  235. forward = self.state.receiveRequestPartFromInterceptors(.end)
  236. }
  237. if forward {
  238. self.receiveRequestPartInObserver(part)
  239. }
  240. }
  241. /// Send a response part via the `Channel`. Called once the response part has traversed the
  242. /// interceptor pipeline.
  243. /// - Parameters:
  244. /// - part: The response part to send.
  245. /// - promise: A promise to complete once the response part has been written.
  246. private func sendResponsePartFromInterceptors(
  247. _ part: GRPCServerResponsePart<ResponsePayload>,
  248. promise: EventLoopPromise<Void>?
  249. ) {
  250. let forward: Bool
  251. switch part {
  252. case .metadata:
  253. forward = self.state.sendResponsePartFromInterceptors(.metadata)
  254. case .message:
  255. forward = self.state.sendResponsePartFromInterceptors(.message)
  256. case .end:
  257. forward = self.state.sendResponsePartFromInterceptors(.end)
  258. }
  259. if forward, let context = self.context {
  260. self.writeResponsePartToChannel(context: context, part: part, promise: promise)
  261. } else {
  262. promise?.fail(GRPCError.AlreadyComplete())
  263. }
  264. }
  265. }
  266. // MARK: - State
  267. private enum State {
  268. /// Idle. We're waiting to be added to a pipeline.
  269. case idle
  270. /// We're in a pipeline and receiving from the client.
  271. case active(ActiveState)
  272. /// We're done. This state is terminal, all actions are ignored.
  273. case closed
  274. }
  275. private enum RPCStreamPart {
  276. case metadata
  277. case message
  278. case end
  279. }
  280. extension State {
  281. /// The state of the request and response streams.
  282. ///
  283. /// We track the stream state twice: between the 'Channel' and interceptor pipeline, and between
  284. /// the interceptor pipeline and event observer.
  285. fileprivate enum StreamState {
  286. case requestIdleResponseIdle
  287. case requestOpenResponseIdle
  288. case requestOpenResponseOpen
  289. case requestClosedResponseIdle
  290. case requestClosedResponseOpen
  291. case requestClosedResponseClosed
  292. mutating func receiveHeaders() -> Bool {
  293. switch self {
  294. case .requestIdleResponseIdle:
  295. self = .requestOpenResponseIdle
  296. return true
  297. case .requestOpenResponseIdle,
  298. .requestOpenResponseOpen,
  299. .requestClosedResponseIdle,
  300. .requestClosedResponseOpen,
  301. .requestClosedResponseClosed:
  302. return false
  303. }
  304. }
  305. func receiveMessage() -> Bool {
  306. switch self {
  307. case .requestOpenResponseIdle,
  308. .requestOpenResponseOpen:
  309. return true
  310. case .requestIdleResponseIdle,
  311. .requestClosedResponseIdle,
  312. .requestClosedResponseOpen,
  313. .requestClosedResponseClosed:
  314. return false
  315. }
  316. }
  317. mutating func receiveEnd() -> Bool {
  318. switch self {
  319. case .requestOpenResponseIdle:
  320. self = .requestClosedResponseIdle
  321. return true
  322. case .requestOpenResponseOpen:
  323. self = .requestClosedResponseOpen
  324. return true
  325. case .requestIdleResponseIdle,
  326. .requestClosedResponseIdle,
  327. .requestClosedResponseOpen,
  328. .requestClosedResponseClosed:
  329. return false
  330. }
  331. }
  332. mutating func sendHeaders() -> Bool {
  333. switch self {
  334. case .requestOpenResponseIdle:
  335. self = .requestOpenResponseOpen
  336. return true
  337. case .requestClosedResponseIdle:
  338. self = .requestClosedResponseOpen
  339. return true
  340. case .requestIdleResponseIdle,
  341. .requestOpenResponseOpen,
  342. .requestClosedResponseOpen,
  343. .requestClosedResponseClosed:
  344. return false
  345. }
  346. }
  347. func sendMessage() -> Bool {
  348. switch self {
  349. case .requestOpenResponseOpen,
  350. .requestClosedResponseOpen:
  351. return true
  352. case .requestIdleResponseIdle,
  353. .requestOpenResponseIdle,
  354. .requestClosedResponseIdle,
  355. .requestClosedResponseClosed:
  356. return false
  357. }
  358. }
  359. mutating func sendEnd() -> Bool {
  360. switch self {
  361. case .requestIdleResponseIdle:
  362. return false
  363. case .requestOpenResponseIdle,
  364. .requestOpenResponseOpen,
  365. .requestClosedResponseIdle,
  366. .requestClosedResponseOpen:
  367. self = .requestClosedResponseClosed
  368. return true
  369. case .requestClosedResponseClosed:
  370. return false
  371. }
  372. }
  373. }
  374. fileprivate struct ActiveState {
  375. /// The stream state between the 'Channel' and interceptor pipeline.
  376. var channelStreamState: StreamState
  377. /// The stream state between the interceptor pipeline and event observer.
  378. var observerStreamState: StreamState
  379. init() {
  380. self.channelStreamState = .requestIdleResponseIdle
  381. self.observerStreamState = .requestIdleResponseIdle
  382. }
  383. }
  384. }
  385. extension State {
  386. /// The handler was added to the `ChannelPipeline`: this is the only way to move from the `.idle`
  387. /// state. We only expect this to be called once.
  388. internal mutating func handlerAdded() {
  389. switch self {
  390. case .idle:
  391. // This is the only way we can become active.
  392. self = .active(.init())
  393. case .active:
  394. preconditionFailure("Invalid state: already active")
  395. case .closed:
  396. ()
  397. }
  398. }
  399. /// Received an error from the `Channel`.
  400. /// - Returns: True if the error should be forwarded to the error observer, or false if it should
  401. /// be dropped.
  402. internal func errorCaught() -> Bool {
  403. switch self {
  404. case .active:
  405. return true
  406. case .idle, .closed:
  407. return false
  408. }
  409. }
  410. /// Receive a metadata part from the `Channel`.
  411. /// - Returns: True if the part should be forwarded to the interceptor pipeline, false otherwise.
  412. internal mutating func channelReadMetadata() -> Bool {
  413. switch self {
  414. case .idle:
  415. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  416. case var .active(state):
  417. let allow = state.channelStreamState.receiveHeaders()
  418. self = .active(state)
  419. return allow
  420. case .closed:
  421. return false
  422. }
  423. }
  424. /// Receive a message part from the `Channel`.
  425. /// - Returns: True if the part should be forwarded to the interceptor pipeline, false otherwise.
  426. internal func channelReadMessage() -> Bool {
  427. switch self {
  428. case .idle:
  429. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  430. case let .active(state):
  431. return state.channelStreamState.receiveMessage()
  432. case .closed:
  433. return false
  434. }
  435. }
  436. /// Receive an end-stream part from the `Channel`.
  437. /// - Returns: True if the part should be forwarded to the interceptor pipeline, false otherwise.
  438. internal mutating func channelReadEnd() -> Bool {
  439. switch self {
  440. case .idle:
  441. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  442. case var .active(state):
  443. let allow = state.channelStreamState.receiveEnd()
  444. self = .active(state)
  445. return allow
  446. case .closed:
  447. return false
  448. }
  449. }
  450. /// Send a response part from the observer to the interceptors.
  451. /// - Returns: True if the part should be forwarded to the interceptor pipeline, false otherwise.
  452. internal mutating func sendResponsePartFromObserver(_ part: RPCStreamPart) -> Bool {
  453. switch self {
  454. case .idle:
  455. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  456. case var .active(state):
  457. // Avoid CoW-ing 'state'.
  458. self = .idle
  459. let allow: Bool
  460. switch part {
  461. case .metadata:
  462. allow = state.observerStreamState.sendHeaders()
  463. case .message:
  464. allow = state.observerStreamState.sendMessage()
  465. case .end:
  466. allow = state.observerStreamState.sendEnd()
  467. }
  468. // Restore the state.
  469. self = .active(state)
  470. return allow
  471. case .closed:
  472. return false
  473. }
  474. }
  475. /// Send a response part from the interceptors to the `Channel`.
  476. /// - Returns: True if the part should be forwarded to the `Channel`, false otherwise.
  477. internal mutating func sendResponsePartFromInterceptors(_ part: RPCStreamPart) -> Bool {
  478. switch self {
  479. case .idle:
  480. preconditionFailure("Invalid state: can't send response on idle call")
  481. case var .active(state):
  482. // Avoid CoW-ing 'state'.
  483. self = .idle
  484. let allow: Bool
  485. switch part {
  486. case .metadata:
  487. allow = state.channelStreamState.sendHeaders()
  488. self = .active(state)
  489. case .message:
  490. allow = state.channelStreamState.sendMessage()
  491. self = .active(state)
  492. case .end:
  493. allow = state.channelStreamState.sendEnd()
  494. // We're sending end, we're no longer active.
  495. self = .closed
  496. }
  497. return allow
  498. case .closed:
  499. // We're already closed.
  500. return false
  501. }
  502. }
  503. /// A request part has traversed the interceptor pipeline, now send it to the observer.
  504. /// - Returns: True if the part should be forwarded to the observer, false otherwise.
  505. internal mutating func receiveRequestPartFromInterceptors(_ part: RPCStreamPart) -> Bool {
  506. switch self {
  507. case .idle:
  508. preconditionFailure("Invalid state: the handler isn't in the pipeline yet")
  509. case var .active(state):
  510. // Avoid CoW-ing `state`.
  511. self = .idle
  512. let allow: Bool
  513. // Does the active state allow us to send this?
  514. switch part {
  515. case .metadata:
  516. allow = state.observerStreamState.receiveHeaders()
  517. case .message:
  518. allow = state.observerStreamState.receiveMessage()
  519. case .end:
  520. allow = state.observerStreamState.receiveEnd()
  521. }
  522. // Put `state` back.
  523. self = .active(state)
  524. return allow
  525. case .closed:
  526. // We're closed, just ignore this.
  527. return false
  528. }
  529. }
  530. }
  531. // MARK: State Actions
  532. extension _BaseCallHandler {
  533. /// Receives a request part in the interceptor pipeline.
  534. private func receiveRequestPartInInterceptors(_ part: GRPCServerRequestPart<RequestPayload>) {
  535. self.pipeline?.receive(part)
  536. }
  537. /// Observe a request part. This just farms out to the subclass implementation for the
  538. /// appropriate part.
  539. private func receiveRequestPartInObserver(_ part: GRPCServerRequestPart<RequestPayload>) {
  540. switch part {
  541. case let .metadata(headers):
  542. self.observeHeaders(headers)
  543. case let .message(request):
  544. self.observeRequest(request)
  545. case .end:
  546. self.observeEnd()
  547. }
  548. }
  549. /// Sends a response part into the interceptor pipeline.
  550. private func sendResponsePartToInterceptors(
  551. _ part: GRPCServerResponsePart<ResponsePayload>,
  552. promise: EventLoopPromise<Void>?
  553. ) {
  554. if let pipeline = self.pipeline {
  555. pipeline.send(part, promise: promise)
  556. } else {
  557. promise?.fail(GRPCError.AlreadyComplete())
  558. }
  559. }
  560. /// Writes a response part to the `Channel`.
  561. private func writeResponsePartToChannel(
  562. context: ChannelHandlerContext,
  563. part: GRPCServerResponsePart<ResponsePayload>,
  564. promise: EventLoopPromise<Void>?
  565. ) {
  566. let flush: Bool
  567. switch part {
  568. case let .metadata(headers):
  569. // Only flush if we're streaming responses, if we're not streaming responses then we'll wait
  570. // for the response and end before emitting the flush.
  571. flush = self.callType.isStreamingResponses
  572. context.write(self.wrapOutboundOut(.metadata(headers)), promise: promise)
  573. case let .message(message, metadata):
  574. do {
  575. let serializedResponse = try self.responseSerializer.serialize(
  576. message,
  577. allocator: context.channel.allocator
  578. )
  579. context.write(
  580. self.wrapOutboundOut(.message(serializedResponse, metadata)),
  581. promise: promise
  582. )
  583. // Flush if we've been told to flush.
  584. flush = metadata.flush
  585. } catch {
  586. self.errorCaught(context: context, error: error)
  587. promise?.fail(error)
  588. return
  589. }
  590. case let .end(status, trailers):
  591. context.write(self.wrapOutboundOut(.end(status, trailers)), promise: promise)
  592. // Always flush on end.
  593. flush = true
  594. }
  595. if flush {
  596. context.flush()
  597. }
  598. }
  599. }