RPCRouter.swift 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. /*
  2. * Copyright 2023, gRPC Authors All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. /// Stores and provides handlers for RPCs.
  17. ///
  18. /// The router stores a handler for each RPC it knows about. Each handler encapsulate the business
  19. /// logic for the RPC which is typically implemented by service owners. To register a handler you
  20. /// can call ``registerHandler(forMethod:deserializer:serializer:handler:)``. You can check whether
  21. /// the router has a handler for a method with ``hasHandler(forMethod:)`` or get a list of all
  22. /// methods with handlers registered by calling ``methods``. You can also remove the handler for a
  23. /// given method by calling ``removeHandler(forMethod:)``.
  24. /// You can also register any interceptors that you want applied to registered handlers via the
  25. /// ``registerInterceptors(pipeline:)`` method.
  26. ///
  27. /// In most cases you won't need to interact with the router directly. Instead you should register
  28. /// your services with ``GRPCServer/init(transport:services:interceptors:)`` which will in turn
  29. /// register each method with the router.
  30. ///
  31. /// You may wish to not serve all methods from your service in which case you can either:
  32. ///
  33. /// 1. Remove individual methods by calling ``removeHandler(forMethod:)``, or
  34. /// 2. Implement ``RegistrableRPCService/registerMethods(with:)`` to register only the methods you
  35. /// want to be served.
  36. @available(gRPCSwift 2.0, *)
  37. public struct RPCRouter<Transport: ServerTransport>: Sendable {
  38. @usableFromInline
  39. struct RPCHandler: Sendable {
  40. @usableFromInline
  41. let _fn:
  42. @Sendable (
  43. _ stream: RPCStream<
  44. RPCAsyncSequence<RPCRequestPart<Transport.Bytes>, any Error>,
  45. RPCWriter<RPCResponsePart<Transport.Bytes>>.Closable
  46. >,
  47. _ context: ServerContext,
  48. _ interceptors: [any ServerInterceptor]
  49. ) async -> Void
  50. @inlinable
  51. init<Input, Output>(
  52. method: MethodDescriptor,
  53. deserializer: some MessageDeserializer<Input>,
  54. serializer: some MessageSerializer<Output>,
  55. handler: @Sendable @escaping (
  56. _ request: StreamingServerRequest<Input>,
  57. _ context: ServerContext
  58. ) async throws -> StreamingServerResponse<Output>
  59. ) {
  60. self._fn = { stream, context, interceptors in
  61. await ServerRPCExecutor.execute(
  62. context: context,
  63. stream: stream,
  64. deserializer: deserializer,
  65. serializer: serializer,
  66. interceptors: interceptors,
  67. handler: handler
  68. )
  69. }
  70. }
  71. @inlinable
  72. func handle(
  73. stream: RPCStream<
  74. RPCAsyncSequence<RPCRequestPart<Transport.Bytes>, any Error>,
  75. RPCWriter<RPCResponsePart<Transport.Bytes>>.Closable
  76. >,
  77. context: ServerContext,
  78. interceptors: [any ServerInterceptor]
  79. ) async {
  80. await self._fn(stream, context, interceptors)
  81. }
  82. }
  83. @usableFromInline
  84. private(set) var handlers:
  85. [MethodDescriptor: (handler: RPCHandler, interceptors: [any ServerInterceptor])]
  86. /// Creates a new router with no methods registered.
  87. public init() {
  88. self.handlers = [:]
  89. }
  90. /// Returns all descriptors known to the router in an undefined order.
  91. public var methods: [MethodDescriptor] {
  92. Array(self.handlers.keys)
  93. }
  94. /// Returns the number of methods registered with the router.
  95. public var count: Int {
  96. self.handlers.count
  97. }
  98. /// Returns whether a handler exists for a given method.
  99. ///
  100. /// - Parameter descriptor: A descriptor of the method.
  101. /// - Returns: Whether a handler exists for the method.
  102. public func hasHandler(forMethod descriptor: MethodDescriptor) -> Bool {
  103. return self.handlers.keys.contains(descriptor)
  104. }
  105. /// Registers a handler with the router.
  106. ///
  107. /// - Note: if a handler already exists for a given method then it will be replaced.
  108. ///
  109. /// - Parameters:
  110. /// - descriptor: A descriptor for the method to register a handler for.
  111. /// - deserializer: A deserializer to deserialize input messages received from the client.
  112. /// - serializer: A serializer to serialize output messages to send to the client.
  113. /// - handler: The function which handles the request and returns a response.
  114. @inlinable
  115. public mutating func registerHandler<Input: Sendable, Output: Sendable>(
  116. forMethod descriptor: MethodDescriptor,
  117. deserializer: some MessageDeserializer<Input>,
  118. serializer: some MessageSerializer<Output>,
  119. handler: @Sendable @escaping (
  120. _ request: StreamingServerRequest<Input>,
  121. _ context: ServerContext
  122. ) async throws -> StreamingServerResponse<Output>
  123. ) {
  124. let handler = RPCHandler(
  125. method: descriptor,
  126. deserializer: deserializer,
  127. serializer: serializer,
  128. handler: handler
  129. )
  130. self.handlers[descriptor] = (handler, [])
  131. }
  132. /// Removes any handler registered for the specified method.
  133. ///
  134. /// - Parameter descriptor: A descriptor of the method to remove a handler for.
  135. /// - Returns: Whether a handler was removed.
  136. @discardableResult
  137. public mutating func removeHandler(forMethod descriptor: MethodDescriptor) -> Bool {
  138. return self.handlers.removeValue(forKey: descriptor) != nil
  139. }
  140. /// Registers applicable interceptors to all currently-registered handlers.
  141. ///
  142. /// - Important: Calling this method will apply the interceptors only to existing handlers. Any handlers registered via
  143. /// ``registerHandler(forMethod:deserializer:serializer:handler:)`` _after_ calling this method will not have
  144. /// any interceptors applied to them. If you want to make sure all registered methods have any applicable interceptors applied,
  145. /// only call this method _after_ you have registered all handlers.
  146. /// - Parameter pipeline: The interceptor pipeline operations to register to all currently-registered handlers. The order of the
  147. /// interceptors matters.
  148. @inlinable
  149. public mutating func registerInterceptors(
  150. pipeline: [ConditionalInterceptor<any ServerInterceptor>]
  151. ) {
  152. for descriptor in self.handlers.keys {
  153. let applicableOperations = pipeline.filter { $0.applies(to: descriptor) }
  154. if !applicableOperations.isEmpty {
  155. self.handlers[descriptor]?.interceptors = applicableOperations.map { $0.interceptor }
  156. }
  157. }
  158. }
  159. }
  160. @available(gRPCSwift 2.0, *)
  161. extension RPCRouter {
  162. internal func handle(
  163. stream: RPCStream<
  164. RPCAsyncSequence<RPCRequestPart<Transport.Bytes>, any Error>,
  165. RPCWriter<RPCResponsePart<Transport.Bytes>>.Closable
  166. >,
  167. context: ServerContext
  168. ) async {
  169. if let (handler, interceptors) = self.handlers[stream.descriptor] {
  170. await handler.handle(stream: stream, context: context, interceptors: interceptors)
  171. } else {
  172. // If this throws then the stream must be closed which we can't do anything about, so ignore
  173. // any error.
  174. try? await stream.outbound.write(.status(.rpcNotImplemented, [:]))
  175. await stream.outbound.finish()
  176. }
  177. }
  178. }
  179. @available(gRPCSwift 2.0, *)
  180. extension Status {
  181. fileprivate static let rpcNotImplemented = Status(
  182. code: .unimplemented,
  183. message: "Requested RPC isn't implemented by this server."
  184. )
  185. }