Skip to content

Commit

Permalink
record loadbalance time for sofa rpc.
Browse files Browse the repository at this point in the history
  • Loading branch information
hexiaofeng committed May 9, 2024
1 parent 92b640f commit eb8ca85
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.jd.live.agent.core.instance.Application;
import com.jd.live.agent.governance.config.GovernanceConfig;
import com.jd.live.agent.governance.invoke.loadbalance.LoadBalancer;
import com.jd.live.agent.governance.invoke.matcher.TagMatcher;
import com.jd.live.agent.governance.invoke.retry.RetrierFactory;
import com.jd.live.agent.governance.policy.PolicySupplier;
Expand Down Expand Up @@ -132,4 +133,88 @@ default VariableFunction getVariableFunction(String name) {
*/
Map<String, TagMatcher> getTagMatchers();

/**
* etrieves the {@code LoadBalancer} instance associated with the specified name,
* * or returns the default loadbalancer instance if no loadbalancer is found with that name.
*
* @param name The name of the loadbalancer.
* @return the {@code LoadBalancer} instance associated with the given name, or the
* default loadbalancer if no matching name is found.
*/
LoadBalancer getOrDefaultLoadBalancer(String name);

/**
* A delegate class for {@link InvocationContext} that forwards all its operations to another {@link InvocationContext} instance.
* This class acts as a wrapper or intermediary, allowing for additional behaviors to be inserted before or after
* the delegation of method calls. It implements the {@link InvocationContext} interface and can be used
* anywhere an InvocationContext is required, providing a flexible mechanism for extending or modifying the behavior
* of invocation contexts dynamically.
*
* <p>This delegate pattern is particularly useful for adding cross-cutting concerns like logging, monitoring,
* or security checks in a transparent manner, without altering the original behavior of the invocation context.</p>
*
* @see InvocationContext
*/
class InvocationContextDelegate implements InvocationContext {

/**
* The {@link InvocationContext} instance to which this delegate will forward all method calls.
*/
protected final InvocationContext delegate;

/**
* Constructs a new {@code InvocationContextDelegate} with a specified {@link InvocationContext} to delegate to.
*
* @param delegate The {@link InvocationContext} instance that this delegate will forward calls to.
*/
public InvocationContextDelegate(InvocationContext delegate) {
this.delegate = delegate;
}

@Override
public Application getApplication() {
return delegate.getApplication();
}

@Override
public GovernanceConfig getGovernanceConfig() {
return delegate.getGovernanceConfig();
}

@Override
public PolicySupplier getPolicySupplier() {
return delegate.getPolicySupplier();
}

@Override
public Map<String, UnitFunction> getUnitFunctions() {
return delegate.getUnitFunctions();
}

@Override
public Map<String, VariableFunction> getVariableFunctions() {
return delegate.getVariableFunctions();
}

@Override
public Map<String, VariableParser<?, ?>> getVariableParsers() {
return delegate.getVariableParsers();
}

@Override
public RetrierFactory getOrDefaultRetrierFactory(String name) {
return delegate.getOrDefaultRetrierFactory(name);
}

@Override
public Map<String, TagMatcher> getTagMatchers() {
return delegate.getTagMatchers();
}

@Override
public LoadBalancer getOrDefaultLoadBalancer(String name) {
return delegate.getOrDefaultLoadBalancer(name);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,8 @@
*/
package com.jd.live.agent.governance.invoke.filter.route;

import com.jd.live.agent.bootstrap.classloader.ResourcerType;
import com.jd.live.agent.core.extension.annotation.ConditionalOnProperty;
import com.jd.live.agent.core.extension.annotation.Extension;
import com.jd.live.agent.core.inject.annotation.Inject;
import com.jd.live.agent.core.inject.annotation.InjectLoader;
import com.jd.live.agent.core.inject.annotation.Injectable;
import com.jd.live.agent.governance.config.GovernanceConfig;
import com.jd.live.agent.governance.context.RequestContext;
import com.jd.live.agent.governance.instance.Endpoint;
Expand All @@ -29,7 +25,6 @@
import com.jd.live.agent.governance.invoke.filter.RouteFilter;
import com.jd.live.agent.governance.invoke.filter.RouteFilterChain;
import com.jd.live.agent.governance.invoke.loadbalance.LoadBalancer;
import com.jd.live.agent.governance.invoke.loadbalance.randomweight.RandomWeightLoadBalancer;
import com.jd.live.agent.governance.policy.service.ServicePolicy;
import com.jd.live.agent.governance.policy.service.loadbalance.LoadBalancePolicy;
import com.jd.live.agent.governance.policy.service.loadbalance.StickyType;
Expand All @@ -39,23 +34,17 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* LoadBalanceFilter applies load balancing to the list of route targets. It ensures that
* requests are distributed across available instances in a balanced manner based on the
* configured load balancing policy.
*/
@Injectable
@Extension(value = "LoadBalanceFilter", order = RouteFilter.ORDER_LOADBALANCE)
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_FLOW_CONTROL_ENABLED, matchIfMissing = true)
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_LOADBALANCE_ENABLED, matchIfMissing = true)
public class LoadBalanceFilter implements RouteFilter {

@Inject
@InjectLoader(ResourcerType.CORE_IMPL)
private Map<String, LoadBalancer> loadBalancers;

@Override
public <T extends ServiceRequest.OutboundRequest> void filter(OutboundInvocation<T> invocation, RouteFilterChain chain) {
RouteTarget target = invocation.getRouteTarget();
Expand Down Expand Up @@ -136,10 +125,8 @@ private List<? extends Endpoint> preferSticky(RouteTarget target) {
private LoadBalancer getLoadBalancer(OutboundInvocation<?> invocation) {
ServicePolicy servicePolicy = invocation.getServiceMetadata().getServicePolicy();
LoadBalancePolicy loadBalancePolicy = servicePolicy == null ? null : servicePolicy.getLoadBalancePolicy();
LoadBalancer loadBalancer = loadBalancePolicy == null ? null : loadBalancers.get(loadBalancePolicy.getPolicyType());
// If no load balancer is found, use a default random-weight load balancer
loadBalancer = loadBalancer == null ? RandomWeightLoadBalancer.INSTANCE : loadBalancer;
return loadBalancer;
String policyType = loadBalancePolicy == null ? null : loadBalancePolicy.getPolicyType();
return invocation.getContext().getOrDefaultLoadBalancer(policyType);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
@Extensible("LoadBalancer")
public interface LoadBalancer {

int ORDER_RANDOM_WEIGHT = 0;

int ORDER_ROUND_ROBIN = ORDER_RANDOM_WEIGHT + 1;

/**
* Chooses an endpoint from a list based on the invocation context. If the list is null or empty,
* returns null. If the list contains only one endpoint, returns that endpoint. Otherwise,
Expand Down Expand Up @@ -59,4 +63,47 @@ default <T extends Endpoint> T choose(List<T> endpoints, Invocation<?> invocatio
*/
<T extends Endpoint> T doSelect(List<T> endpoints, Invocation<?> invocation);

/**
* A delegate class for load balancing that forwards all its operations to another {@link LoadBalancer} instance.
* This class acts as a wrapper or intermediary, allowing for additional behaviors to be inserted before or after
* the delegation of the load balancing task. It implements the {@link LoadBalancer} interface and can be used
* anywhere a LoadBalancer is required, providing a flexible mechanism for extending or modifying load balancing
* behavior dynamically.
*
* @see LoadBalancer
*/
class LoadBalancerDelegate implements LoadBalancer {

/**
* The {@link LoadBalancer} instance to which this delegate will forward all method calls.
*/
protected LoadBalancer delegate;

/**
* Constructs a new {@code LoadBalancerDelegate} with a specified {@link LoadBalancer} to delegate to.
*
* @param delegate The {@link LoadBalancer} instance that this delegate will forward calls to.
*/
public LoadBalancerDelegate(LoadBalancer delegate) {
this.delegate = delegate;
}

/**
* Delegates the selection of an endpoint to the underlying {@link LoadBalancer} instance. This method
* is called to select an appropriate {@link Endpoint} from a list of available endpoints based on the
* current load balancing strategy.
*
* @param <T> The type of {@link Endpoint} to be selected.
* @param endpoints A list of available endpoints from which one will be selected.
* @param invocation The invocation context, which may contain metadata or other information used in the
* selection process.
* @return The selected {@link Endpoint}, or {@code null} if no suitable endpoint could be found.
*/
@Override
public <T extends Endpoint> T doSelect(List<T> endpoints, Invocation<?> invocation) {
return delegate.doSelect(endpoints, invocation);
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
*
* @since 1.0.0
*/
@Extension(value = RandomWeightLoadBalancer.LOAD_BALANCER_NAME)
@Extension(value = RandomWeightLoadBalancer.LOAD_BALANCER_NAME, order = LoadBalancer.ORDER_RANDOM_WEIGHT)
public class RandomWeightLoadBalancer implements LoadBalancer {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* the round-robin counter is scoped to that policy, ensuring that balancing is consistent
* within the context of the policy.
*/
@Extension(value = RoundRobinLoadBalancer.LOAD_BALANCER_NAME)
@Extension(value = RoundRobinLoadBalancer.LOAD_BALANCER_NAME, order = LoadBalancer.ORDER_ROUND_ROBIN)
public class RoundRobinLoadBalancer implements LoadBalancer {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.jd.live.agent.governance.config.LiveConfig;
import com.jd.live.agent.governance.config.ServiceConfig;
import com.jd.live.agent.governance.invoke.InvocationContext;
import com.jd.live.agent.governance.invoke.loadbalance.LoadBalancer;
import com.jd.live.agent.governance.invoke.matcher.TagMatcher;
import com.jd.live.agent.governance.invoke.retry.RetrierFactory;
import com.jd.live.agent.governance.policy.variable.UnitFunction;
Expand Down Expand Up @@ -103,6 +104,16 @@ public class PolicyManager implements PolicySupervisor, InjectSourceSupplier, Ex
@InjectLoader(ResourcerType.CORE_IMPL)
private Map<String, TagMatcher> tagMatchers;

@Getter
@Inject
@InjectLoader(ResourcerType.CORE_IMPL)
private Map<String, LoadBalancer> loadBalancers;

@Getter
@Inject
@InjectLoader(ResourcerType.CORE_IMPL)
private LoadBalancer loadBalancer;

private final AtomicInteger counter = new AtomicInteger(0);

private final CompletableFuture<Void> future = new CompletableFuture<>();
Expand All @@ -123,6 +134,12 @@ public RetrierFactory getOrDefaultRetrierFactory(String name) {
return result == null ? retrierFactory : result;
}

@Override
public LoadBalancer getOrDefaultLoadBalancer(String name) {
LoadBalancer result = loadBalancers == null || name == null ? null : loadBalancers.get(name);
return result == null ? loadBalancer : result;
}

@Override
public boolean update(GovernancePolicy expect, GovernancePolicy update) {
// live policy is updated by a few services.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.context.RpcInternalContext;
import com.alipay.sofa.rpc.context.RpcInvokeContext;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.core.response.SofaResponse;
Expand All @@ -25,6 +26,8 @@

import java.util.List;

import static com.alipay.sofa.rpc.common.RpcConstants.INTERNAL_KEY_CLIENT_ROUTER_TIME_NANO;

/**
* Represents a live cluster that manages sticky sessions for RPC requests.
* This class is responsible for maintaining a consistent request routing strategy
Expand Down Expand Up @@ -84,7 +87,10 @@ public void setStickyId(String stickyId) {
* @return a list of ProviderInfo objects that can potentially handle the request
*/
public List<ProviderInfo> route(SofaRequest request) {
return cluster.getRouterChain().route(request, null);
long routerStartTime = System.nanoTime();
List<ProviderInfo> result = cluster.getRouterChain().route(request, null);
RpcInvokeContext.getContext().put(INTERNAL_KEY_CLIENT_ROUTER_TIME_NANO, System.nanoTime() - routerStartTime);
return result;
}

/**
Expand Down
Loading

0 comments on commit eb8ca85

Please sign in to comment.