2
0
Эх сурвалжийг харах

Stop sending requests in call init (#848)

Motivation:

Previously @lukasa noted that we did some networking in the call init's
which isn't ideal. I forgot that calls are made though an internal
class, so it is indeed possible to move the networking calls.

Modifications:

- Move networking calls to just after the call object is created
- Shuffling code about, no real functional changes

Result:

- No networking in the *Call inits
- Cleaner inits
- Nothing functional
George Barnett 5 жил өмнө
parent
commit
1f272f8ec7

+ 22 - 25
Sources/GRPC/ClientCalls/BidirectionalStreamingCall.swift

@@ -134,40 +134,37 @@ public final class BidirectionalStreamingCall<
   }
 
   internal init(
-    path: String,
-    scheme: String,
-    authority: String,
-    callOptions: CallOptions,
-    eventLoop: EventLoop,
+    transport: ChannelTransport<RequestPayload, ResponsePayload>,
+    options: CallOptions
+  ) {
+    self.transport = transport
+    self.options = options
+  }
+
+  internal func sendHead(_ head: _GRPCRequestHead) {
+    self.transport.sendRequest(.head(head), promise: nil)
+  }
+}
+
+extension BidirectionalStreamingCall {
+  internal static func makeOnHTTP2Stream(
     multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
+    callOptions: CallOptions,
     errorDelegate: ClientErrorDelegate?,
     logger: Logger,
-    handler: @escaping (ResponsePayload) -> Void
-  ) {
-    let requestID = callOptions.requestIDProvider.requestID()
-    var logger = logger
-    logger[metadataKey: MetadataKey.requestID] = "\(requestID)"
-    logger.debug("starting rpc", metadata: ["path": "\(path)"])
-
-    self.transport = ChannelTransport(
+    responseHandler: @escaping (ResponsePayload) -> Void
+  ) -> BidirectionalStreamingCall<RequestPayload, ResponsePayload> {
+    let eventLoop = multiplexer.eventLoop
+    let transport = ChannelTransport<RequestPayload, ResponsePayload>(
       multiplexer: multiplexer,
-      responseContainer: .init(eventLoop: eventLoop, streamingResponseHandler: handler),
+      responseContainer: .init(eventLoop: eventLoop, streamingResponseHandler: responseHandler),
       callType: .bidirectionalStreaming,
       timeLimit: callOptions.timeLimit,
       errorDelegate: errorDelegate,
       logger: logger
     )
 
-    self.options = callOptions
-
-    let requestHead = _GRPCRequestHead(
-      scheme: scheme,
-      path: path,
-      host: authority,
-      requestID: requestID,
-      options: callOptions
-    )
-
-    self.transport.sendRequest(.head(requestHead), promise: nil)
+    return BidirectionalStreamingCall(transport: transport, options: callOptions)
   }
 }
+

+ 21 - 24
Sources/GRPC/ClientCalls/ClientStreamingCall.swift

@@ -137,22 +137,30 @@ public final class ClientStreamingCall<
   }
 
   internal init(
-    path: String,
-    scheme: String,
-    authority: String,
-    callOptions: CallOptions,
-    eventLoop: EventLoop,
+    response: EventLoopFuture<ResponsePayload>,
+    transport: ChannelTransport<RequestPayload, ResponsePayload>,
+    options: CallOptions
+  ) {
+    self.response = response
+    self.transport = transport
+    self.options = options
+  }
+
+  internal func sendHead(_ head: _GRPCRequestHead) {
+    self.transport.sendRequest(.head(head), promise: nil)
+  }
+}
+
+extension ClientStreamingCall {
+  internal static func makeOnHTTP2Stream(
     multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
+    callOptions: CallOptions,
     errorDelegate: ClientErrorDelegate?,
     logger: Logger
-  ) {
-    let requestID = callOptions.requestIDProvider.requestID()
-    var logger = logger
-    logger[metadataKey: MetadataKey.requestID] = "\(requestID)"
-    logger.debug("starting rpc", metadata: ["path": "\(path)"])
-
+  ) -> ClientStreamingCall<RequestPayload, ResponsePayload> {
+    let eventLoop = multiplexer.eventLoop
     let responsePromise: EventLoopPromise<ResponsePayload> = eventLoop.makePromise()
-    self.transport = ChannelTransport(
+    let transport = ChannelTransport<RequestPayload, ResponsePayload>(
       multiplexer: multiplexer,
       responseContainer: .init(eventLoop: eventLoop, unaryResponsePromise: responsePromise),
       callType: .clientStreaming,
@@ -160,17 +168,6 @@ public final class ClientStreamingCall<
       errorDelegate: errorDelegate,
       logger: logger
     )
-    self.options = callOptions
-    self.response = responsePromise.futureResult
-
-    let requestHead = _GRPCRequestHead(
-      scheme: scheme,
-      path: path,
-      host: authority,
-      requestID: requestID,
-      options: callOptions
-    )
-
-    self.transport.sendRequest(.head(requestHead), promise: nil)
+    return ClientStreamingCall(response: responsePromise.futureResult, transport: transport, options: callOptions)
   }
 }

+ 23 - 32
Sources/GRPC/ClientCalls/ServerStreamingCall.swift

@@ -78,47 +78,38 @@ public final class ServerStreamingCall<
       }
     }
   }
-  
-  init(
-    path: String,
-    scheme: String,
-    authority: String,
-    callOptions: CallOptions,
-    eventLoop: EventLoop,
+
+  internal init(
+    transport: ChannelTransport<RequestPayload, ResponsePayload>,
+    options: CallOptions
+  ) {
+    self.transport = transport
+    self.options = options
+  }
+
+  internal func send(_ head: _GRPCRequestHead, request: RequestPayload) {
+    self.transport.sendUnary(head, request: request, compressed: self.options.messageEncoding.enabledForRequests)
+  }
+}
+
+extension ServerStreamingCall {
+  internal static func makeOnHTTP2Stream(
     multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
+    callOptions: CallOptions,
     errorDelegate: ClientErrorDelegate?,
     logger: Logger,
-    request: RequestPayload,
-    handler: @escaping (ResponsePayload) -> Void
-  ) {
-    let requestID = callOptions.requestIDProvider.requestID()
-    var logger = logger
-    logger[metadataKey: MetadataKey.requestID] = "\(requestID)"
-    logger[metadataKey: "path"] = "\(path)"
-
-    self.transport = ChannelTransport(
+    responseHandler: @escaping (ResponsePayload) -> Void
+  ) -> ServerStreamingCall<RequestPayload, ResponsePayload> {
+    let eventLoop = multiplexer.eventLoop
+    let transport = ChannelTransport<RequestPayload, ResponsePayload>(
       multiplexer: multiplexer,
-      responseContainer: .init(eventLoop: eventLoop, streamingResponseHandler: handler),
+      responseContainer: .init(eventLoop: eventLoop, streamingResponseHandler: responseHandler),
       callType: .serverStreaming,
       timeLimit: callOptions.timeLimit,
       errorDelegate: errorDelegate,
       logger: logger
     )
 
-    self.options = callOptions
-
-    let requestHead = _GRPCRequestHead(
-      scheme: scheme,
-      path: path,
-      host: authority,
-      requestID: requestID,
-      options: callOptions
-    )
-
-    self.transport.sendUnary(
-      requestHead,
-      request: request,
-      compressed: callOptions.messageEncoding.enabledForRequests
-    )
+    return ServerStreamingCall(transport: transport, options: callOptions)
   }
 }

+ 22 - 31
Sources/GRPC/ClientCalls/UnaryCall.swift

@@ -85,23 +85,30 @@ public final class UnaryCall<
   }
 
   internal init(
-    path: String,
-    scheme: String,
-    authority: String,
-    callOptions: CallOptions,
-    eventLoop: EventLoop,
-    multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
-    errorDelegate: ClientErrorDelegate?,
-    logger: Logger,
-    request: RequestPayload
+    response: EventLoopFuture<ResponsePayload>,
+    transport: ChannelTransport<RequestPayload, ResponsePayload>,
+    options: CallOptions
   ) {
-    let requestID = callOptions.requestIDProvider.requestID()
-    var logger = logger
-    logger[metadataKey: MetadataKey.requestID] = "\(requestID)"
-    logger[metadataKey: "path"] = "\(path)"
+    self.response = response
+    self.transport = transport
+    self.options = options
+  }
 
+  internal func send(_ head: _GRPCRequestHead, request: RequestPayload) {
+    self.transport.sendUnary(head, request: request, compressed: self.options.messageEncoding.enabledForRequests)
+  }
+}
+
+extension UnaryCall {
+  internal static func makeOnHTTP2Stream(
+    multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
+    callOptions: CallOptions,
+    errorDelegate: ClientErrorDelegate?,
+    logger: Logger
+  ) -> UnaryCall<RequestPayload, ResponsePayload> {
+    let eventLoop = multiplexer.eventLoop
     let responsePromise: EventLoopPromise<ResponsePayload> = eventLoop.makePromise()
-    self.transport = ChannelTransport(
+    let transport = ChannelTransport<RequestPayload, ResponsePayload>(
       multiplexer: multiplexer,
       responseContainer: .init(eventLoop: eventLoop, unaryResponsePromise: responsePromise),
       callType: .unary,
@@ -109,22 +116,6 @@ public final class UnaryCall<
       errorDelegate: errorDelegate,
       logger: logger
     )
-
-    self.options = callOptions
-    self.response = responsePromise.futureResult
-
-    let requestHead = _GRPCRequestHead(
-      scheme: scheme,
-      path: path,
-      host: authority,
-      requestID: requestID,
-      options: callOptions
-    )
-
-    self.transport.sendUnary(
-      requestHead,
-      request: request,
-      compressed: callOptions.messageEncoding.enabledForRequests
-    )
+    return UnaryCall(response: responsePromise.futureResult, transport: transport, options: callOptions)
   }
 }

+ 64 - 34
Sources/GRPC/ClientConnection.swift

@@ -115,6 +115,22 @@ public class ClientConnection {
   public func close() -> EventLoopFuture<Void> {
     return self.connectionManager.shutdown()
   }
+
+  private func loggerWithRequestID(_ requestID: String) -> Logger {
+    var logger = self.connectionManager.logger
+    logger[metadataKey: MetadataKey.requestID] = "\(requestID)"
+    return logger
+  }
+
+  private func makeRequestHead(path: String, options: CallOptions, requestID: String) -> _GRPCRequestHead {
+    return _GRPCRequestHead(
+      scheme: self.scheme,
+      path: path,
+      host: self.authority,
+      requestID: requestID,
+      options: options
+    )
+  }
 }
 
 // Note: documentation is inherited.
@@ -124,33 +140,40 @@ extension ClientConnection: GRPCChannel {
     request: Request,
     callOptions: CallOptions
   ) -> UnaryCall<Request, Response> where Request : GRPCPayload, Response : GRPCPayload {
-    return UnaryCall(
-      path: path,
-      scheme: self.scheme,
-      authority: self.authority,
-      callOptions: callOptions,
-      eventLoop: self.eventLoop,
+    let requestID = callOptions.requestIDProvider.requestID()
+    let logger = self.loggerWithRequestID(requestID)
+    logger.debug("starting rpc", metadata: ["path": "\(path)"])
+
+    let call = UnaryCall<Request, Response>.makeOnHTTP2Stream(
       multiplexer: self.multiplexer,
+      callOptions: callOptions,
       errorDelegate: self.configuration.errorDelegate,
-      logger: self.connectionManager.logger,
-      request: request
+      logger: logger
     )
+
+    call.send(self.makeRequestHead(path: path, options: callOptions, requestID: requestID), request: request)
+
+    return call
   }
 
   public func makeClientStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
     path: String,
     callOptions: CallOptions
   ) -> ClientStreamingCall<Request, Response> {
-    return ClientStreamingCall(
-      path: path,
-      scheme: self.scheme,
-      authority: self.authority,
-      callOptions: callOptions,
-      eventLoop: self.eventLoop,
+    let requestID = callOptions.requestIDProvider.requestID()
+    let logger = self.loggerWithRequestID(requestID)
+    logger.debug("starting rpc", metadata: ["path": "\(path)"])
+
+    let call = ClientStreamingCall<Request, Response>.makeOnHTTP2Stream(
       multiplexer: self.multiplexer,
+      callOptions: callOptions,
       errorDelegate: self.configuration.errorDelegate,
-      logger: self.connectionManager.logger
+      logger: logger
     )
+
+    call.sendHead(self.makeRequestHead(path: path, options: callOptions, requestID: requestID))
+
+    return call
   }
 
   public func makeServerStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
@@ -159,18 +182,21 @@ extension ClientConnection: GRPCChannel {
     callOptions: CallOptions,
     handler: @escaping (Response) -> Void
   ) -> ServerStreamingCall<Request, Response> {
-    return ServerStreamingCall(
-      path: path,
-      scheme: self.scheme,
-      authority: self.authority,
+    let requestID = callOptions.requestIDProvider.requestID()
+    let logger = self.loggerWithRequestID(requestID)
+    logger.debug("starting rpc", metadata: ["path": "\(path)"])
+
+    let call = ServerStreamingCall<Request, Response>.makeOnHTTP2Stream(
+      multiplexer: multiplexer,
       callOptions: callOptions,
-      eventLoop: self.eventLoop,
-      multiplexer: self.multiplexer,
       errorDelegate: self.configuration.errorDelegate,
-      logger: self.connectionManager.logger,
-      request: request,
-      handler: handler
+      logger: logger,
+      responseHandler: handler
     )
+
+    call.send(self.makeRequestHead(path: path, options: callOptions, requestID: requestID), request: request)
+
+    return call
   }
 
   public func makeBidirectionalStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
@@ -178,17 +204,21 @@ extension ClientConnection: GRPCChannel {
     callOptions: CallOptions,
     handler: @escaping (Response) -> Void
   ) -> BidirectionalStreamingCall<Request, Response> {
-    return BidirectionalStreamingCall(
-      path: path,
-      scheme: self.scheme,
-      authority: self.authority,
+    let requestID = callOptions.requestIDProvider.requestID()
+    let logger = self.loggerWithRequestID(requestID)
+    logger.debug("starting rpc", metadata: ["path": "\(path)"])
+
+    let call = BidirectionalStreamingCall<Request, Response>.makeOnHTTP2Stream(
+      multiplexer: multiplexer,
       callOptions: callOptions,
-      eventLoop: self.eventLoop,
-      multiplexer: self.multiplexer,
       errorDelegate: self.configuration.errorDelegate,
-      logger: self.connectionManager.logger,
-      handler: handler
+      logger: logger,
+      responseHandler: handler
     )
+
+    call.sendHead(self.makeRequestHead(path: path, options: callOptions, requestID: requestID))
+
+    return call
   }
 }
 
@@ -250,7 +280,7 @@ extension ClientConnection {
     ///
     /// If a connection becomes idle, starting a new RPC will automatically create a new connection.
     public var connectionIdleTimeout: TimeAmount
-    
+
     /// The HTTP/2 flow control target window size.
     public var httpTargetWindowSize: Int
 
@@ -400,7 +430,7 @@ extension String {
     // We need some scratch space to let inet_pton write into.
     var ipv4Addr = in_addr()
     var ipv6Addr = in6_addr()
-    
+
     return self.withCString { ptr in
       return inet_pton(AF_INET, ptr, &ipv4Addr) == 1 ||
         inet_pton(AF_INET6, ptr, &ipv6Addr) == 1

+ 42 - 34
Sources/GRPC/GRPCChannel/EmbeddedGRPCChannel.swift

@@ -51,75 +51,83 @@ class EmbeddedGRPCChannel: GRPCChannel {
     self.errorDelegate = errorDelegate
   }
 
-  func makeUnaryCall<Request: GRPCPayload, Response: GRPCPayload>(
+  private func makeRequestHead(path: String, options: CallOptions) -> _GRPCRequestHead {
+    return _GRPCRequestHead(
+      scheme: self.scheme,
+      path: path,
+      host: self.authority,
+      requestID: options.requestIDProvider.requestID(),
+      options: options
+    )
+  }
+
+  internal func makeUnaryCall<Request: GRPCPayload, Response: GRPCPayload>(
     path: String,
     request: Request,
     callOptions: CallOptions
-  ) -> UnaryCall<Request, Response> where Request : GRPCPayload, Response : GRPCPayload {
-    return UnaryCall(
-      path: path,
-      scheme: self.scheme,
-      authority: self.authority,
-      callOptions: callOptions,
-      eventLoop: self.eventLoop,
+  ) -> UnaryCall<Request, Response> {
+    let call = UnaryCall<Request, Response>.makeOnHTTP2Stream(
       multiplexer: self.multiplexer,
+      callOptions: callOptions,
       errorDelegate: self.errorDelegate,
-      logger: self.logger,
-      request: request
+      logger: self.logger
     )
+
+    call.send(self.makeRequestHead(path: path, options: callOptions), request: request)
+
+    return call
   }
 
-  func makeClientStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
+  internal func makeClientStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
     path: String,
     callOptions: CallOptions
   ) -> ClientStreamingCall<Request, Response> {
-    return ClientStreamingCall(
-      path: path,
-      scheme: self.scheme,
-      authority: self.authority,
-      callOptions: callOptions,
-      eventLoop: self.eventLoop,
+    let call = ClientStreamingCall<Request, Response>.makeOnHTTP2Stream(
       multiplexer: self.multiplexer,
+      callOptions: callOptions,
       errorDelegate: self.errorDelegate,
       logger: self.logger
     )
+
+    call.sendHead(self.makeRequestHead(path: path, options: callOptions))
+
+    return call
   }
 
-  func makeServerStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
+  internal func makeServerStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
     path: String,
     request: Request,
     callOptions: CallOptions,
     handler: @escaping (Response) -> Void
   ) -> ServerStreamingCall<Request, Response> {
-    return ServerStreamingCall(
-      path: path,
-      scheme: self.scheme,
-      authority: self.authority,
-      callOptions: callOptions,
-      eventLoop: self.eventLoop,
+    let call = ServerStreamingCall<Request, Response>.makeOnHTTP2Stream(
       multiplexer: self.multiplexer,
+      callOptions: callOptions,
       errorDelegate: self.errorDelegate,
       logger: self.logger,
-      request: request,
-      handler: handler
+      responseHandler: handler
     )
+
+    call.send(self.makeRequestHead(path: path, options: callOptions), request: request)
+
+    return call
   }
 
-  func makeBidirectionalStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
+  internal func makeBidirectionalStreamingCall<Request: GRPCPayload, Response: GRPCPayload>(
     path: String,
     callOptions: CallOptions,
     handler: @escaping (Response) -> Void
   ) -> BidirectionalStreamingCall<Request, Response> {
-    return BidirectionalStreamingCall(
-      path: path,
-      scheme: self.scheme,
-      authority: self.authority,
-      callOptions: callOptions,
-      eventLoop: self.eventLoop,
+    let call = BidirectionalStreamingCall<Request, Response>.makeOnHTTP2Stream(
       multiplexer: self.multiplexer,
+      callOptions: callOptions,
       errorDelegate: self.errorDelegate,
       logger: self.logger,
-      handler: handler
+      responseHandler: handler
     )
+
+    call.sendHead(self.makeRequestHead(path: path, options: callOptions))
+
+    return call
   }
 }