2
0

UnaryCallHandler.swift 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOHPACK
  20. import NIOHTTP1
  21. import SwiftProtobuf
  22. /// Handles unary calls. Calls the observer block with the request message.
  23. ///
  24. /// - The observer block is implemented by the framework user and returns a future containing the call result.
  25. /// - To return a response to the client, the framework user should complete that future
  26. /// (similar to e.g. serving regular HTTP requests in frameworks such as Vapor).
  27. public final class UnaryCallHandler<
  28. RequestDeserializer: MessageDeserializer,
  29. ResponseSerializer: MessageSerializer
  30. >: _BaseCallHandler<RequestDeserializer, ResponseSerializer> {
  31. @usableFromInline
  32. internal typealias _Context = UnaryResponseCallContext<ResponsePayload>
  33. @usableFromInline
  34. internal typealias _Observer = (RequestPayload) -> EventLoopFuture<ResponsePayload>
  35. @usableFromInline
  36. internal var _callHandlerState: _CallHandlerState
  37. @usableFromInline
  38. internal enum _CallHandlerState {
  39. // We don't have the following states (which we do have in the main state machine):
  40. // - 'requestOpenResponseIdle',
  41. // - 'requestClosedResponseIdle'
  42. //
  43. // We'll send headers back when we transition away from 'requestIdleResponseIdle' so the
  44. // response stream can never be less idle than the request stream.
  45. /// Fully idle, we haven't seen the request headers yet and we haven't made an event observer
  46. /// yet.
  47. case requestIdleResponseIdle((_Context) -> _Observer)
  48. /// Received the request headers, created an observer and have sent back response headers.
  49. /// We may or may not have observer the request message yet.
  50. case requestOpenResponseOpen(_Context, ObserverState)
  51. /// Received the request headers, a message and the end of the request stream. The observer has
  52. /// been invoked but it hasn't yet finished processing the request.
  53. ///
  54. /// Note: we know we've received a message if we're in this state, if we had seen the request
  55. /// headers followed by end we'd fully close.
  56. case requestClosedResponseOpen(_Context)
  57. /// We're done.
  58. case requestClosedResponseClosed
  59. /// The state of the event observer.
  60. @usableFromInline
  61. enum ObserverState {
  62. /// We have an event observer, but haven't yet received a request.
  63. case notObserved(_Observer)
  64. /// We've invoked the event observer with a request.
  65. case observed
  66. }
  67. }
  68. @inlinable
  69. internal init(
  70. serializer: ResponseSerializer,
  71. deserializer: RequestDeserializer,
  72. callHandlerContext: CallHandlerContext,
  73. interceptors: [ServerInterceptor<RequestDeserializer.Output, ResponseSerializer.Input>],
  74. eventObserverFactory: @escaping (UnaryResponseCallContext<ResponsePayload>)
  75. -> (RequestPayload) -> EventLoopFuture<ResponsePayload>
  76. ) {
  77. self._callHandlerState = .requestIdleResponseIdle(eventObserverFactory)
  78. super.init(
  79. callHandlerContext: callHandlerContext,
  80. requestDeserializer: deserializer,
  81. responseSerializer: serializer,
  82. callType: .unary,
  83. interceptors: interceptors
  84. )
  85. }
  86. override public func channelInactive(context: ChannelHandlerContext) {
  87. super.channelInactive(context: context)
  88. // Fail any remaining promise.
  89. switch self._callHandlerState {
  90. case .requestIdleResponseIdle,
  91. .requestClosedResponseClosed:
  92. self._callHandlerState = .requestClosedResponseClosed
  93. case let .requestOpenResponseOpen(context, _),
  94. let .requestClosedResponseOpen(context):
  95. self._callHandlerState = .requestClosedResponseClosed
  96. context.responsePromise.fail(GRPCError.AlreadyComplete())
  97. }
  98. }
  99. /// Handle an error from the event observer.
  100. private func handleObserverError(_ error: Error) {
  101. switch self._callHandlerState {
  102. case .requestIdleResponseIdle:
  103. preconditionFailure("Invalid state: request observer hasn't been created")
  104. case let .requestOpenResponseOpen(context, _),
  105. let .requestClosedResponseOpen(context):
  106. let (status, trailers) = self.processObserverError(
  107. error,
  108. headers: context.headers,
  109. trailers: context.trailers
  110. )
  111. // This will handle the response promise as well.
  112. self.sendEnd(status: status, trailers: trailers)
  113. case .requestClosedResponseClosed:
  114. // We hit an error, but we're already closed (i.e. we hit a library error first). Ignore
  115. // the error.
  116. ()
  117. }
  118. }
  119. /// Handle a 'library' error, i.e. an error emanating from the `Channel`.
  120. private func handleLibraryError(_ error: Error) {
  121. switch self._callHandlerState {
  122. case .requestIdleResponseIdle,
  123. .requestOpenResponseOpen(_, .notObserved):
  124. // We haven't seen a message, we'll send end to close the stream.
  125. let (status, trailers) = self.processLibraryError(error)
  126. self.sendEnd(status: status, trailers: trailers)
  127. case .requestOpenResponseOpen(_, .observed),
  128. .requestClosedResponseOpen:
  129. // We've seen a message, the observer is in flight, we'll let it play out.
  130. ()
  131. case .requestClosedResponseClosed:
  132. // We're already closed, we can just ignore this.
  133. ()
  134. }
  135. }
  136. // MARK: - Inbound
  137. override internal func observeLibraryError(_ error: Error) {
  138. self.handleLibraryError(error)
  139. }
  140. override internal func observeHeaders(_ headers: HPACKHeaders) {
  141. switch self._callHandlerState {
  142. case let .requestIdleResponseIdle(factory):
  143. // This allocates a promise, but the observer is provided with 'StatusOnlyCallContext' and
  144. // doesn't get access to the promise. The observer must return a response future instead
  145. // which we cascade to this promise. We can avoid this extra allocation by using a different
  146. // context here.
  147. //
  148. // TODO: provide a new context without a promise.
  149. let context = UnaryResponseCallContext<ResponsePayload>(
  150. eventLoop: self.eventLoop,
  151. headers: headers,
  152. logger: self.logger,
  153. userInfoRef: self._userInfoRef
  154. )
  155. let observer = factory(context)
  156. // We're fully open now (we'll send the response headers back in a moment).
  157. self._callHandlerState = .requestOpenResponseOpen(context, .notObserved(observer))
  158. // Register callbacks for the response promise.
  159. context.responsePromise.futureResult.whenComplete { result in
  160. switch result {
  161. case let .success(response):
  162. self.sendResponse(response)
  163. case let .failure(error):
  164. self.handleObserverError(error)
  165. }
  166. }
  167. // Write back the response headers.
  168. self.sendResponsePartFromObserver(.metadata([:]), promise: nil)
  169. // The main state machine guards against these states.
  170. case .requestOpenResponseOpen,
  171. .requestClosedResponseOpen,
  172. .requestClosedResponseClosed:
  173. preconditionFailure("Invalid state: request headers already received")
  174. }
  175. }
  176. override internal func observeRequest(_ message: RequestPayload) {
  177. switch self._callHandlerState {
  178. case .requestIdleResponseIdle:
  179. preconditionFailure("Invalid state: request received before headers")
  180. case let .requestOpenResponseOpen(context, request):
  181. switch request {
  182. case .observed:
  183. // We've already observed the request message. The main state machine doesn't guard against
  184. // too many messages for unary streams. Assuming downstream handlers protect against this
  185. // then this must be an errant interceptor, we'll ignore it.
  186. ()
  187. case let .notObserved(observer):
  188. self._callHandlerState = .requestOpenResponseOpen(context, .observed)
  189. // Complete the promise with the observer block.
  190. context.responsePromise.completeWith(observer(message))
  191. }
  192. case .requestClosedResponseOpen,
  193. .requestClosedResponseClosed:
  194. preconditionFailure("Invalid state: the request stream has already been closed")
  195. }
  196. }
  197. override internal func observeEnd() {
  198. switch self._callHandlerState {
  199. case .requestIdleResponseIdle:
  200. preconditionFailure("Invalid state: no request headers received")
  201. case let .requestOpenResponseOpen(context, request):
  202. switch request {
  203. case .observed:
  204. // Close the request stream.
  205. self._callHandlerState = .requestClosedResponseOpen(context)
  206. case .notObserved:
  207. // We haven't received a request: this is an empty stream, the observer will never be
  208. // invoked.
  209. context.responsePromise.fail(GRPCError.StreamCardinalityViolation.request)
  210. }
  211. case .requestClosedResponseOpen,
  212. .requestClosedResponseClosed:
  213. preconditionFailure("Invalid state: request stream is already closed")
  214. }
  215. }
  216. // MARK: - Outbound
  217. private func sendResponse(_ message: ResponsePayload) {
  218. switch self._callHandlerState {
  219. case .requestIdleResponseIdle:
  220. preconditionFailure("Invalid state: can't send response before receiving headers and request")
  221. case .requestOpenResponseOpen(_, .notObserved):
  222. preconditionFailure("Invalid state: can't send response before receiving request")
  223. case let .requestOpenResponseOpen(context, .observed),
  224. let .requestClosedResponseOpen(context):
  225. self._callHandlerState = .requestClosedResponseClosed
  226. self.sendResponsePartFromObserver(
  227. .message(message, .init(compress: context.compressionEnabled, flush: false)),
  228. promise: nil
  229. )
  230. self.sendResponsePartFromObserver(
  231. .end(context.responseStatus, context.trailers),
  232. promise: nil
  233. )
  234. case .requestClosedResponseClosed:
  235. // Already closed, do nothing.
  236. ()
  237. }
  238. }
  239. private func sendEnd(status: GRPCStatus, trailers: HPACKHeaders) {
  240. switch self._callHandlerState {
  241. case .requestIdleResponseIdle,
  242. .requestClosedResponseOpen:
  243. self._callHandlerState = .requestClosedResponseClosed
  244. self.sendResponsePartFromObserver(.end(status, trailers), promise: nil)
  245. case let .requestOpenResponseOpen(context, _):
  246. self._callHandlerState = .requestClosedResponseClosed
  247. self.sendResponsePartFromObserver(.end(status, trailers), promise: nil)
  248. // Fail the promise.
  249. context.responsePromise.fail(status)
  250. case .requestClosedResponseClosed:
  251. // Already closed, do nothing.
  252. ()
  253. }
  254. }
  255. }