UnaryServerHandler.swift 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. /*
  2. * Copyright 2021, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import NIOCore
  17. import NIOHPACK
  18. public final class UnaryServerHandler<
  19. Serializer: MessageSerializer,
  20. Deserializer: MessageDeserializer
  21. >: GRPCServerHandlerProtocol {
  22. public typealias Request = Deserializer.Output
  23. public typealias Response = Serializer.Input
  24. /// A response serializer.
  25. @usableFromInline
  26. internal let serializer: Serializer
  27. /// A request deserializer.
  28. @usableFromInline
  29. internal let deserializer: Deserializer
  30. /// A pipeline of user provided interceptors.
  31. @usableFromInline
  32. internal var interceptors: ServerInterceptorPipeline<Request, Response>!
  33. /// The context required in order create the function.
  34. @usableFromInline
  35. internal let context: CallHandlerContext
  36. /// A reference to a `UserInfo`.
  37. @usableFromInline
  38. internal let userInfoRef: Ref<UserInfo>
  39. /// The user provided function to execute.
  40. @usableFromInline
  41. internal let userFunction: (Request, StatusOnlyCallContext) -> EventLoopFuture<Response>
  42. /// The state of the function invocation.
  43. @usableFromInline
  44. internal var state: State = .idle
  45. @usableFromInline
  46. internal enum State {
  47. // Initial state. Nothing has happened yet.
  48. case idle
  49. // Headers have been received and now we're holding a context with which to invoke the user
  50. // function when we receive a message.
  51. case createdContext(UnaryResponseCallContext<Response>)
  52. // The user function has been invoked, we're waiting for the response.
  53. case invokedFunction(UnaryResponseCallContext<Response>)
  54. // The function has completed or we are no longer proceeding with execution (because of an error
  55. // or unexpected closure).
  56. case completed
  57. }
  58. @inlinable
  59. public init(
  60. context: CallHandlerContext,
  61. requestDeserializer: Deserializer,
  62. responseSerializer: Serializer,
  63. interceptors: [ServerInterceptor<Request, Response>],
  64. userFunction: @escaping (Request, StatusOnlyCallContext) -> EventLoopFuture<Response>
  65. ) {
  66. self.userFunction = userFunction
  67. self.serializer = responseSerializer
  68. self.deserializer = requestDeserializer
  69. self.context = context
  70. let userInfoRef = Ref(UserInfo())
  71. self.userInfoRef = userInfoRef
  72. self.interceptors = ServerInterceptorPipeline(
  73. logger: context.logger,
  74. eventLoop: context.eventLoop,
  75. path: context.path,
  76. callType: .unary,
  77. remoteAddress: context.remoteAddress,
  78. userInfoRef: userInfoRef,
  79. closeFuture: context.closeFuture,
  80. interceptors: interceptors,
  81. onRequestPart: self.receiveInterceptedPart(_:),
  82. onResponsePart: self.sendInterceptedPart(_:promise:)
  83. )
  84. }
  85. // MARK: - Public API: gRPC to Interceptors
  86. @inlinable
  87. public func receiveMetadata(_ metadata: HPACKHeaders) {
  88. self.interceptors.receive(.metadata(metadata))
  89. }
  90. @inlinable
  91. public func receiveMessage(_ bytes: ByteBuffer) {
  92. do {
  93. let message = try self.deserializer.deserialize(byteBuffer: bytes)
  94. self.interceptors.receive(.message(message))
  95. } catch {
  96. self.handleError(error)
  97. }
  98. }
  99. @inlinable
  100. public func receiveEnd() {
  101. self.interceptors.receive(.end)
  102. }
  103. @inlinable
  104. public func receiveError(_ error: Error) {
  105. self.handleError(error)
  106. self.finish()
  107. }
  108. @inlinable
  109. public func finish() {
  110. switch self.state {
  111. case .idle:
  112. self.interceptors = nil
  113. self.state = .completed
  114. case let .createdContext(context),
  115. let .invokedFunction(context):
  116. context.responsePromise.fail(GRPCStatus(code: .unavailable, message: nil))
  117. case .completed:
  118. self.interceptors = nil
  119. }
  120. }
  121. // MARK: - Interceptors to User Function
  122. @inlinable
  123. internal func receiveInterceptedPart(_ part: GRPCServerRequestPart<Request>) {
  124. switch part {
  125. case let .metadata(headers):
  126. self.receiveInterceptedMetadata(headers)
  127. case let .message(message):
  128. self.receiveInterceptedMessage(message)
  129. case .end:
  130. self.receiveInterceptedEnd()
  131. }
  132. }
  133. @inlinable
  134. internal func receiveInterceptedMetadata(_ headers: HPACKHeaders) {
  135. switch self.state {
  136. case .idle:
  137. // Make a context to invoke the user function with.
  138. let context = UnaryResponseCallContext<Response>(
  139. eventLoop: self.context.eventLoop,
  140. headers: headers,
  141. logger: self.context.logger,
  142. userInfoRef: self.userInfoRef,
  143. closeFuture: self.context.closeFuture
  144. )
  145. // Move to the next state.
  146. self.state = .createdContext(context)
  147. // Register a callback on the response future. The user function will complete this promise.
  148. context.responsePromise.futureResult.whenComplete(self.userFunctionCompletedWithResult(_:))
  149. // Send back response headers.
  150. self.interceptors.send(.metadata([:]), promise: nil)
  151. case .createdContext, .invokedFunction:
  152. self.handleError(GRPCError.ProtocolViolation("Multiple header blocks received on RPC"))
  153. case .completed:
  154. // We may receive headers from the interceptor pipeline if we have already finished (i.e. due
  155. // to an error or otherwise) and an interceptor doing some async work later emitting headers.
  156. // Dropping them is fine.
  157. ()
  158. }
  159. }
  160. @inlinable
  161. internal func receiveInterceptedMessage(_ request: Request) {
  162. switch self.state {
  163. case .idle:
  164. self.handleError(GRPCError.ProtocolViolation("Message received before headers"))
  165. case let .createdContext(context):
  166. // Happy path: execute the function; complete the promise with the result.
  167. self.state = .invokedFunction(context)
  168. context.responsePromise.completeWith(self.userFunction(request, context))
  169. case .invokedFunction:
  170. // The function's already been invoked with a message.
  171. self.handleError(GRPCError.ProtocolViolation("Multiple messages received on unary RPC"))
  172. case .completed:
  173. // We received a message but we're already done: this may happen if we terminate the RPC
  174. // due to a channel error, for example.
  175. ()
  176. }
  177. }
  178. @inlinable
  179. internal func receiveInterceptedEnd() {
  180. switch self.state {
  181. case .idle:
  182. self.handleError(GRPCError.ProtocolViolation("End received before headers"))
  183. case .createdContext:
  184. self.handleError(GRPCError.ProtocolViolation("End received before message"))
  185. case .invokedFunction, .completed:
  186. ()
  187. }
  188. }
  189. // MARK: - User Function To Interceptors
  190. @inlinable
  191. internal func userFunctionCompletedWithResult(_ result: Result<Response, Error>) {
  192. switch self.state {
  193. case .idle:
  194. // Invalid state: the user function can only complete if it was executed.
  195. preconditionFailure()
  196. // 'created' is allowed here: we may have to (and tear down) after receiving headers
  197. // but before receiving a message.
  198. case let .createdContext(context),
  199. let .invokedFunction(context):
  200. switch result {
  201. case let .success(response):
  202. // Complete, as we're sending 'end'.
  203. self.state = .completed
  204. // Compression depends on whether it's enabled on the server and the setting in the caller
  205. // context.
  206. let compress = self.context.encoding.isEnabled && context.compressionEnabled
  207. let metadata = MessageMetadata(compress: compress, flush: false)
  208. self.interceptors.send(.message(response, metadata), promise: nil)
  209. self.interceptors.send(.end(context.responseStatus, context.trailers), promise: nil)
  210. case let .failure(error):
  211. self.handleError(error, thrownFromHandler: true)
  212. }
  213. case .completed:
  214. // We've already failed. Ignore this.
  215. ()
  216. }
  217. }
  218. @inlinable
  219. internal func sendInterceptedPart(
  220. _ part: GRPCServerResponsePart<Response>,
  221. promise: EventLoopPromise<Void>?
  222. ) {
  223. switch part {
  224. case let .metadata(headers):
  225. // We can delay this flush until the end of the RPC.
  226. self.context.responseWriter.sendMetadata(headers, flush: false, promise: promise)
  227. case let .message(message, metadata):
  228. do {
  229. let bytes = try self.serializer.serialize(message, allocator: self.context.allocator)
  230. self.context.responseWriter.sendMessage(bytes, metadata: metadata, promise: promise)
  231. } catch {
  232. // Serialization failed: fail the promise and send end.
  233. promise?.fail(error)
  234. let (status, trailers) = ServerErrorProcessor.processLibraryError(
  235. error,
  236. delegate: self.context.errorDelegate
  237. )
  238. // Loop back via the interceptors.
  239. self.interceptors.send(.end(status, trailers), promise: nil)
  240. }
  241. case let .end(status, trailers):
  242. self.context.responseWriter.sendEnd(status: status, trailers: trailers, promise: promise)
  243. }
  244. }
  245. @inlinable
  246. internal func handleError(_ error: Error, thrownFromHandler isHandlerError: Bool = false) {
  247. switch self.state {
  248. case .idle:
  249. assert(!isHandlerError)
  250. self.state = .completed
  251. // We don't have a promise to fail. Just send back end.
  252. let (status, trailers) = ServerErrorProcessor.processLibraryError(
  253. error,
  254. delegate: self.context.errorDelegate
  255. )
  256. self.interceptors.send(.end(status, trailers), promise: nil)
  257. case let .createdContext(context),
  258. let .invokedFunction(context):
  259. // We don't have a promise to fail. Just send back end.
  260. self.state = .completed
  261. let status: GRPCStatus
  262. let trailers: HPACKHeaders
  263. if isHandlerError {
  264. (status, trailers) = ServerErrorProcessor.processObserverError(
  265. error,
  266. headers: context.headers,
  267. trailers: context.trailers,
  268. delegate: self.context.errorDelegate
  269. )
  270. } else {
  271. (status, trailers) = ServerErrorProcessor.processLibraryError(
  272. error,
  273. delegate: self.context.errorDelegate
  274. )
  275. }
  276. self.interceptors.send(.end(status, trailers), promise: nil)
  277. // We're already in the 'completed' state so failing the promise will be a no-op in the
  278. // callback to 'userFunctionCompletedWithResult' (but we also need to avoid leaking the
  279. // promise.)
  280. context.responsePromise.fail(error)
  281. case .completed:
  282. ()
  283. }
  284. }
  285. }