UnaryCallHandler.swift 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. /*
  2. * Copyright 2019, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Foundation
  17. import Logging
  18. import NIO
  19. import NIOHPACK
  20. import NIOHTTP1
  21. import SwiftProtobuf
  22. /// Handles unary calls. Calls the observer block with the request message.
  23. ///
  24. /// - The observer block is implemented by the framework user and returns a future containing the call result.
  25. /// - To return a response to the client, the framework user should complete that future
  26. /// (similar to e.g. serving regular HTTP requests in frameworks such as Vapor).
  27. public final class UnaryCallHandler<
  28. RequestPayload,
  29. ResponsePayload
  30. >: _BaseCallHandler<RequestPayload, ResponsePayload> {
  31. private typealias Context = UnaryResponseCallContext<ResponsePayload>
  32. private typealias Observer = (RequestPayload) -> EventLoopFuture<ResponsePayload>
  33. private var state: State
  34. private enum State {
  35. // We don't have the following states (which we do have in the main state machine):
  36. // - 'requestOpenResponseIdle',
  37. // - 'requestClosedResponseIdle'
  38. //
  39. // We'll send headers back when we transition away from 'requestIdleResponseIdle' so the
  40. // response stream can never be less idle than the request stream.
  41. /// Fully idle, we haven't seen the request headers yet and we haven't made an event observer
  42. /// yet.
  43. case requestIdleResponseIdle((Context) -> Observer)
  44. /// Received the request headers, created an observer and have sent back response headers.
  45. /// We may or may not have observer the request message yet.
  46. case requestOpenResponseOpen(Context, ObserverState)
  47. /// Received the request headers, a message and the end of the request stream. The observer has
  48. /// been invoked but it hasn't yet finished processing the request.
  49. ///
  50. /// Note: we know we've received a message if we're in this state, if we had seen the request
  51. /// headers followed by end we'd fully close.
  52. case requestClosedResponseOpen(Context)
  53. /// We're done.
  54. case requestClosedResponseClosed
  55. /// The state of the event observer.
  56. enum ObserverState {
  57. /// We have an event observer, but haven't yet received a request.
  58. case notObserved(Observer)
  59. /// We've invoked the event observer with a request.
  60. case observed
  61. }
  62. }
  63. internal init<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
  64. serializer: Serializer,
  65. deserializer: Deserializer,
  66. callHandlerContext: CallHandlerContext,
  67. interceptors: [ServerInterceptor<Deserializer.Output, Serializer.Input>],
  68. eventObserverFactory: @escaping (UnaryResponseCallContext<ResponsePayload>)
  69. -> (RequestPayload) -> EventLoopFuture<ResponsePayload>
  70. ) where Serializer.Input == ResponsePayload, Deserializer.Output == RequestPayload {
  71. self.state = .requestIdleResponseIdle(eventObserverFactory)
  72. super.init(
  73. callHandlerContext: callHandlerContext,
  74. codec: GRPCServerCodecHandler(serializer: serializer, deserializer: deserializer),
  75. callType: .unary,
  76. interceptors: interceptors
  77. )
  78. }
  79. override public func channelInactive(context: ChannelHandlerContext) {
  80. super.channelInactive(context: context)
  81. // Fail any remaining promise.
  82. switch self.state {
  83. case .requestIdleResponseIdle,
  84. .requestClosedResponseClosed:
  85. self.state = .requestClosedResponseClosed
  86. case let .requestOpenResponseOpen(context, _),
  87. let .requestClosedResponseOpen(context):
  88. self.state = .requestClosedResponseClosed
  89. context.responsePromise.fail(GRPCError.AlreadyComplete())
  90. }
  91. }
  92. /// Handle an error from the event observer.
  93. private func handleObserverError(_ error: Error) {
  94. switch self.state {
  95. case .requestIdleResponseIdle:
  96. preconditionFailure("Invalid state: request observer hasn't been created")
  97. case let .requestOpenResponseOpen(context, _),
  98. let .requestClosedResponseOpen(context):
  99. let (status, trailers) = self.processObserverError(
  100. error,
  101. headers: context.headers,
  102. trailers: context.trailers
  103. )
  104. // This will handle the response promise as well.
  105. self.sendEnd(status: status, trailers: trailers)
  106. case .requestClosedResponseClosed:
  107. // We hit an error, but we're already closed (i.e. we hit a library error first). Ignore
  108. // the error.
  109. ()
  110. }
  111. }
  112. /// Handle a 'library' error, i.e. an error emanating from the `Channel`.
  113. private func handleLibraryError(_ error: Error) {
  114. switch self.state {
  115. case .requestIdleResponseIdle,
  116. .requestOpenResponseOpen(_, .notObserved):
  117. // We haven't seen a message, we'll send end to close the stream.
  118. let (status, trailers) = self.processLibraryError(error)
  119. self.sendEnd(status: status, trailers: trailers)
  120. case .requestOpenResponseOpen(_, .observed),
  121. .requestClosedResponseOpen:
  122. // We've seen a message, the observer is in flight, we'll let it play out.
  123. ()
  124. case .requestClosedResponseClosed:
  125. // We're already closed, we can just ignore this.
  126. ()
  127. }
  128. }
  129. // MARK: - Inbound
  130. override internal func observeLibraryError(_ error: Error) {
  131. self.handleLibraryError(error)
  132. }
  133. override internal func observeHeaders(_ headers: HPACKHeaders) {
  134. switch self.state {
  135. case let .requestIdleResponseIdle(factory):
  136. // This allocates a promise, but the observer is provided with 'StatusOnlyCallContext' and
  137. // doesn't get access to the promise. The observer must return a response future instead
  138. // which we cascade to this promise. We can avoid this extra allocation by using a different
  139. // context here.
  140. //
  141. // TODO: provide a new context without a promise.
  142. let context = UnaryResponseCallContext<ResponsePayload>(
  143. eventLoop: self.eventLoop,
  144. headers: headers,
  145. logger: self.logger,
  146. userInfoRef: self.userInfoRef
  147. )
  148. let observer = factory(context)
  149. // We're fully open now (we'll send the response headers back in a moment).
  150. self.state = .requestOpenResponseOpen(context, .notObserved(observer))
  151. // Register callbacks for the response promise.
  152. context.responsePromise.futureResult.whenComplete { result in
  153. switch result {
  154. case let .success(response):
  155. self.sendResponse(response)
  156. case let .failure(error):
  157. self.handleObserverError(error)
  158. }
  159. }
  160. // Write back the response headers.
  161. self.sendResponsePartFromObserver(.metadata([:]), promise: nil)
  162. // The main state machine guards against these states.
  163. case .requestOpenResponseOpen,
  164. .requestClosedResponseOpen,
  165. .requestClosedResponseClosed:
  166. preconditionFailure("Invalid state: request headers already received")
  167. }
  168. }
  169. override internal func observeRequest(_ message: RequestPayload) {
  170. switch self.state {
  171. case .requestIdleResponseIdle:
  172. preconditionFailure("Invalid state: request received before headers")
  173. case let .requestOpenResponseOpen(context, request):
  174. switch request {
  175. case .observed:
  176. // We've already observed the request message. The main state machine doesn't guard against
  177. // too many messages for unary streams. Assuming downstream handlers protect against this
  178. // then this must be an errant interceptor, we'll ignore it.
  179. ()
  180. case let .notObserved(observer):
  181. self.state = .requestOpenResponseOpen(context, .observed)
  182. // Complete the promise with the observer block.
  183. context.responsePromise.completeWith(observer(message))
  184. }
  185. case .requestClosedResponseOpen,
  186. .requestClosedResponseClosed:
  187. preconditionFailure("Invalid state: the request stream has already been closed")
  188. }
  189. }
  190. override internal func observeEnd() {
  191. switch self.state {
  192. case .requestIdleResponseIdle:
  193. preconditionFailure("Invalid state: no request headers received")
  194. case let .requestOpenResponseOpen(context, request):
  195. switch request {
  196. case .observed:
  197. // Close the request stream.
  198. self.state = .requestClosedResponseOpen(context)
  199. case .notObserved:
  200. // We haven't received a request: this is an empty stream, the observer will never be
  201. // invoked.
  202. context.responsePromise.fail(GRPCError.StreamCardinalityViolation.request)
  203. }
  204. case .requestClosedResponseOpen,
  205. .requestClosedResponseClosed:
  206. preconditionFailure("Invalid state: request stream is already closed")
  207. }
  208. }
  209. // MARK: - Outbound
  210. private func sendResponse(_ message: ResponsePayload) {
  211. switch self.state {
  212. case .requestIdleResponseIdle:
  213. preconditionFailure("Invalid state: can't send response before receiving headers and request")
  214. case .requestOpenResponseOpen(_, .notObserved):
  215. preconditionFailure("Invalid state: can't send response before receiving request")
  216. case let .requestOpenResponseOpen(context, .observed),
  217. let .requestClosedResponseOpen(context):
  218. self.state = .requestClosedResponseClosed
  219. self.sendResponsePartFromObserver(
  220. .message(message, .init(compress: context.compressionEnabled, flush: false)),
  221. promise: nil
  222. )
  223. self.sendResponsePartFromObserver(
  224. .end(context.responseStatus, context.trailers),
  225. promise: nil
  226. )
  227. case .requestClosedResponseClosed:
  228. // Already closed, do nothing.
  229. ()
  230. }
  231. }
  232. private func sendEnd(status: GRPCStatus, trailers: HPACKHeaders) {
  233. switch self.state {
  234. case .requestIdleResponseIdle,
  235. .requestClosedResponseOpen:
  236. self.state = .requestClosedResponseClosed
  237. self.sendResponsePartFromObserver(.end(status, trailers), promise: nil)
  238. case let .requestOpenResponseOpen(context, _):
  239. self.state = .requestClosedResponseClosed
  240. self.sendResponsePartFromObserver(.end(status, trailers), promise: nil)
  241. // Fail the promise.
  242. context.responsePromise.fail(status)
  243. case .requestClosedResponseClosed:
  244. // Already closed, do nothing.
  245. ()
  246. }
  247. }
  248. }