diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/InvocationContext.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/InvocationContext.java index 9c2ed114..dde71d34 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/InvocationContext.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/InvocationContext.java @@ -17,6 +17,7 @@ import com.jd.live.agent.core.instance.Application; import com.jd.live.agent.governance.config.GovernanceConfig; +import com.jd.live.agent.governance.invoke.loadbalance.LoadBalancer; import com.jd.live.agent.governance.invoke.matcher.TagMatcher; import com.jd.live.agent.governance.invoke.retry.RetrierFactory; import com.jd.live.agent.governance.policy.PolicySupplier; @@ -132,4 +133,88 @@ default VariableFunction getVariableFunction(String name) { */ Map getTagMatchers(); + /** + * etrieves the {@code LoadBalancer} instance associated with the specified name, + * * or returns the default loadbalancer instance if no loadbalancer is found with that name. + * + * @param name The name of the loadbalancer. + * @return the {@code LoadBalancer} instance associated with the given name, or the + * default loadbalancer if no matching name is found. + */ + LoadBalancer getOrDefaultLoadBalancer(String name); + + /** + * A delegate class for {@link InvocationContext} that forwards all its operations to another {@link InvocationContext} instance. + * This class acts as a wrapper or intermediary, allowing for additional behaviors to be inserted before or after + * the delegation of method calls. It implements the {@link InvocationContext} interface and can be used + * anywhere an InvocationContext is required, providing a flexible mechanism for extending or modifying the behavior + * of invocation contexts dynamically. + * + *

This delegate pattern is particularly useful for adding cross-cutting concerns like logging, monitoring, + * or security checks in a transparent manner, without altering the original behavior of the invocation context.

+ * + * @see InvocationContext + */ + class InvocationContextDelegate implements InvocationContext { + + /** + * The {@link InvocationContext} instance to which this delegate will forward all method calls. + */ + protected final InvocationContext delegate; + + /** + * Constructs a new {@code InvocationContextDelegate} with a specified {@link InvocationContext} to delegate to. + * + * @param delegate The {@link InvocationContext} instance that this delegate will forward calls to. + */ + public InvocationContextDelegate(InvocationContext delegate) { + this.delegate = delegate; + } + + @Override + public Application getApplication() { + return delegate.getApplication(); + } + + @Override + public GovernanceConfig getGovernanceConfig() { + return delegate.getGovernanceConfig(); + } + + @Override + public PolicySupplier getPolicySupplier() { + return delegate.getPolicySupplier(); + } + + @Override + public Map getUnitFunctions() { + return delegate.getUnitFunctions(); + } + + @Override + public Map getVariableFunctions() { + return delegate.getVariableFunctions(); + } + + @Override + public Map> getVariableParsers() { + return delegate.getVariableParsers(); + } + + @Override + public RetrierFactory getOrDefaultRetrierFactory(String name) { + return delegate.getOrDefaultRetrierFactory(name); + } + + @Override + public Map getTagMatchers() { + return delegate.getTagMatchers(); + } + + @Override + public LoadBalancer getOrDefaultLoadBalancer(String name) { + return delegate.getOrDefaultLoadBalancer(name); + } + } + } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/filter/route/LoadBalanceFilter.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/filter/route/LoadBalanceFilter.java index ea75351d..638fa05b 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/filter/route/LoadBalanceFilter.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/filter/route/LoadBalanceFilter.java @@ -15,12 +15,8 @@ */ package com.jd.live.agent.governance.invoke.filter.route; -import com.jd.live.agent.bootstrap.classloader.ResourcerType; import com.jd.live.agent.core.extension.annotation.ConditionalOnProperty; import com.jd.live.agent.core.extension.annotation.Extension; -import com.jd.live.agent.core.inject.annotation.Inject; -import com.jd.live.agent.core.inject.annotation.InjectLoader; -import com.jd.live.agent.core.inject.annotation.Injectable; import com.jd.live.agent.governance.config.GovernanceConfig; import com.jd.live.agent.governance.context.RequestContext; import com.jd.live.agent.governance.instance.Endpoint; @@ -29,7 +25,6 @@ import com.jd.live.agent.governance.invoke.filter.RouteFilter; import com.jd.live.agent.governance.invoke.filter.RouteFilterChain; import com.jd.live.agent.governance.invoke.loadbalance.LoadBalancer; -import com.jd.live.agent.governance.invoke.loadbalance.randomweight.RandomWeightLoadBalancer; import com.jd.live.agent.governance.policy.service.ServicePolicy; import com.jd.live.agent.governance.policy.service.loadbalance.LoadBalancePolicy; import com.jd.live.agent.governance.policy.service.loadbalance.StickyType; @@ -39,23 +34,17 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; /** * LoadBalanceFilter applies load balancing to the list of route targets. It ensures that * requests are distributed across available instances in a balanced manner based on the * configured load balancing policy. */ -@Injectable @Extension(value = "LoadBalanceFilter", order = RouteFilter.ORDER_LOADBALANCE) @ConditionalOnProperty(value = GovernanceConfig.CONFIG_FLOW_CONTROL_ENABLED, matchIfMissing = true) @ConditionalOnProperty(value = GovernanceConfig.CONFIG_LOADBALANCE_ENABLED, matchIfMissing = true) public class LoadBalanceFilter implements RouteFilter { - @Inject - @InjectLoader(ResourcerType.CORE_IMPL) - private Map loadBalancers; - @Override public void filter(OutboundInvocation invocation, RouteFilterChain chain) { RouteTarget target = invocation.getRouteTarget(); @@ -136,10 +125,8 @@ private List preferSticky(RouteTarget target) { private LoadBalancer getLoadBalancer(OutboundInvocation invocation) { ServicePolicy servicePolicy = invocation.getServiceMetadata().getServicePolicy(); LoadBalancePolicy loadBalancePolicy = servicePolicy == null ? null : servicePolicy.getLoadBalancePolicy(); - LoadBalancer loadBalancer = loadBalancePolicy == null ? null : loadBalancers.get(loadBalancePolicy.getPolicyType()); - // If no load balancer is found, use a default random-weight load balancer - loadBalancer = loadBalancer == null ? RandomWeightLoadBalancer.INSTANCE : loadBalancer; - return loadBalancer; + String policyType = loadBalancePolicy == null ? null : loadBalancePolicy.getPolicyType(); + return invocation.getContext().getOrDefaultLoadBalancer(policyType); } } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/loadbalance/LoadBalancer.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/loadbalance/LoadBalancer.java index 61e9c245..1577b946 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/loadbalance/LoadBalancer.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/loadbalance/LoadBalancer.java @@ -29,6 +29,10 @@ @Extensible("LoadBalancer") public interface LoadBalancer { + int ORDER_RANDOM_WEIGHT = 0; + + int ORDER_ROUND_ROBIN = ORDER_RANDOM_WEIGHT + 1; + /** * Chooses an endpoint from a list based on the invocation context. If the list is null or empty, * returns null. If the list contains only one endpoint, returns that endpoint. Otherwise, @@ -59,4 +63,47 @@ default T choose(List endpoints, Invocation invocatio */ T doSelect(List endpoints, Invocation invocation); + /** + * A delegate class for load balancing that forwards all its operations to another {@link LoadBalancer} instance. + * This class acts as a wrapper or intermediary, allowing for additional behaviors to be inserted before or after + * the delegation of the load balancing task. It implements the {@link LoadBalancer} interface and can be used + * anywhere a LoadBalancer is required, providing a flexible mechanism for extending or modifying load balancing + * behavior dynamically. + * + * @see LoadBalancer + */ + class LoadBalancerDelegate implements LoadBalancer { + + /** + * The {@link LoadBalancer} instance to which this delegate will forward all method calls. + */ + protected LoadBalancer delegate; + + /** + * Constructs a new {@code LoadBalancerDelegate} with a specified {@link LoadBalancer} to delegate to. + * + * @param delegate The {@link LoadBalancer} instance that this delegate will forward calls to. + */ + public LoadBalancerDelegate(LoadBalancer delegate) { + this.delegate = delegate; + } + + /** + * Delegates the selection of an endpoint to the underlying {@link LoadBalancer} instance. This method + * is called to select an appropriate {@link Endpoint} from a list of available endpoints based on the + * current load balancing strategy. + * + * @param The type of {@link Endpoint} to be selected. + * @param endpoints A list of available endpoints from which one will be selected. + * @param invocation The invocation context, which may contain metadata or other information used in the + * selection process. + * @return The selected {@link Endpoint}, or {@code null} if no suitable endpoint could be found. + */ + @Override + public T doSelect(List endpoints, Invocation invocation) { + return delegate.doSelect(endpoints, invocation); + } + } + + } diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/loadbalance/randomweight/RandomWeightLoadBalancer.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/loadbalance/randomweight/RandomWeightLoadBalancer.java index e5bc1935..e9bf7157 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/loadbalance/randomweight/RandomWeightLoadBalancer.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/loadbalance/randomweight/RandomWeightLoadBalancer.java @@ -31,7 +31,7 @@ * * @since 1.0.0 */ -@Extension(value = RandomWeightLoadBalancer.LOAD_BALANCER_NAME) +@Extension(value = RandomWeightLoadBalancer.LOAD_BALANCER_NAME, order = LoadBalancer.ORDER_RANDOM_WEIGHT) public class RandomWeightLoadBalancer implements LoadBalancer { /** diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/loadbalance/roundrobin/RoundRobinLoadBalancer.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/loadbalance/roundrobin/RoundRobinLoadBalancer.java index 2f79c694..9aeb64be 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/loadbalance/roundrobin/RoundRobinLoadBalancer.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/invoke/loadbalance/roundrobin/RoundRobinLoadBalancer.java @@ -36,7 +36,7 @@ * the round-robin counter is scoped to that policy, ensuring that balancing is consistent * within the context of the policy. */ -@Extension(value = RoundRobinLoadBalancer.LOAD_BALANCER_NAME) +@Extension(value = RoundRobinLoadBalancer.LOAD_BALANCER_NAME, order = LoadBalancer.ORDER_ROUND_ROBIN) public class RoundRobinLoadBalancer implements LoadBalancer { /** diff --git a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/PolicyManager.java b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/PolicyManager.java index ba8904c9..e9f1b0cd 100644 --- a/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/PolicyManager.java +++ b/joylive-core/joylive-governance-api/src/main/java/com/jd/live/agent/governance/policy/PolicyManager.java @@ -35,6 +35,7 @@ import com.jd.live.agent.governance.config.LiveConfig; import com.jd.live.agent.governance.config.ServiceConfig; import com.jd.live.agent.governance.invoke.InvocationContext; +import com.jd.live.agent.governance.invoke.loadbalance.LoadBalancer; import com.jd.live.agent.governance.invoke.matcher.TagMatcher; import com.jd.live.agent.governance.invoke.retry.RetrierFactory; import com.jd.live.agent.governance.policy.variable.UnitFunction; @@ -103,6 +104,16 @@ public class PolicyManager implements PolicySupervisor, InjectSourceSupplier, Ex @InjectLoader(ResourcerType.CORE_IMPL) private Map tagMatchers; + @Getter + @Inject + @InjectLoader(ResourcerType.CORE_IMPL) + private Map loadBalancers; + + @Getter + @Inject + @InjectLoader(ResourcerType.CORE_IMPL) + private LoadBalancer loadBalancer; + private final AtomicInteger counter = new AtomicInteger(0); private final CompletableFuture future = new CompletableFuture<>(); @@ -123,6 +134,12 @@ public RetrierFactory getOrDefaultRetrierFactory(String name) { return result == null ? retrierFactory : result; } + @Override + public LoadBalancer getOrDefaultLoadBalancer(String name) { + LoadBalancer result = loadBalancers == null || name == null ? null : loadBalancers.get(name); + return result == null ? loadBalancer : result; + } + @Override public boolean update(GovernancePolicy expect, GovernancePolicy update) { // live policy is updated by a few services. diff --git a/joylive-plugin/joylive-router/joylive-router-sofarpc/src/main/java/com/alipay/sofa/rpc/client/LiveCluster.java b/joylive-plugin/joylive-router/joylive-router-sofarpc/src/main/java/com/alipay/sofa/rpc/client/LiveCluster.java index 7d8f9ebc..65dff986 100644 --- a/joylive-plugin/joylive-router/joylive-router-sofarpc/src/main/java/com/alipay/sofa/rpc/client/LiveCluster.java +++ b/joylive-plugin/joylive-router/joylive-router-sofarpc/src/main/java/com/alipay/sofa/rpc/client/LiveCluster.java @@ -17,6 +17,7 @@ import com.alipay.sofa.rpc.common.RpcConstants; import com.alipay.sofa.rpc.context.RpcInternalContext; +import com.alipay.sofa.rpc.context.RpcInvokeContext; import com.alipay.sofa.rpc.core.exception.SofaRpcException; import com.alipay.sofa.rpc.core.request.SofaRequest; import com.alipay.sofa.rpc.core.response.SofaResponse; @@ -25,6 +26,8 @@ import java.util.List; +import static com.alipay.sofa.rpc.common.RpcConstants.INTERNAL_KEY_CLIENT_ROUTER_TIME_NANO; + /** * Represents a live cluster that manages sticky sessions for RPC requests. * This class is responsible for maintaining a consistent request routing strategy @@ -84,7 +87,10 @@ public void setStickyId(String stickyId) { * @return a list of ProviderInfo objects that can potentially handle the request */ public List route(SofaRequest request) { - return cluster.getRouterChain().route(request, null); + long routerStartTime = System.nanoTime(); + List result = cluster.getRouterChain().route(request, null); + RpcInvokeContext.getContext().put(INTERNAL_KEY_CLIENT_ROUTER_TIME_NANO, System.nanoTime() - routerStartTime); + return result; } /** diff --git a/joylive-plugin/joylive-router/joylive-router-sofarpc/src/main/java/com/jd/live/agent/plugin/router/sofarpc/interceptor/ClusterInterceptor.java b/joylive-plugin/joylive-router/joylive-router-sofarpc/src/main/java/com/jd/live/agent/plugin/router/sofarpc/interceptor/ClusterInterceptor.java index 948cbba1..571a5fa8 100644 --- a/joylive-plugin/joylive-router/joylive-router-sofarpc/src/main/java/com/jd/live/agent/plugin/router/sofarpc/interceptor/ClusterInterceptor.java +++ b/joylive-plugin/joylive-router/joylive-router-sofarpc/src/main/java/com/jd/live/agent/plugin/router/sofarpc/interceptor/ClusterInterceptor.java @@ -18,6 +18,8 @@ import com.alipay.sofa.rpc.client.AbstractCluster; import com.alipay.sofa.rpc.client.LiveCluster; import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.context.RpcInvokeContext; import com.alipay.sofa.rpc.core.exception.RpcErrorType; import com.alipay.sofa.rpc.core.exception.SofaRouteException; import com.alipay.sofa.rpc.core.exception.SofaRpcException; @@ -29,8 +31,10 @@ import com.jd.live.agent.bootstrap.exception.RejectException; import com.jd.live.agent.governance.instance.Endpoint; import com.jd.live.agent.governance.interceptor.AbstractInterceptor.AbstractRouteInterceptor; +import com.jd.live.agent.governance.invoke.Invocation; import com.jd.live.agent.governance.invoke.InvocationContext; import com.jd.live.agent.governance.invoke.filter.RouteFilter; +import com.jd.live.agent.governance.invoke.loadbalance.LoadBalancer; import com.jd.live.agent.governance.response.Response; import com.jd.live.agent.plugin.router.sofarpc.instance.SofaRpcEndpoint; import com.jd.live.agent.plugin.router.sofarpc.request.SofaRpcRequest.SofaRpcOutboundRequest; @@ -66,11 +70,9 @@ public void onEnter(ExecutableContext ctx) { MethodContext mc = (MethodContext) ctx; SofaRequest request = (SofaRequest) ctx.getArguments()[0]; LiveCluster cluster = clusters.computeIfAbsent((AbstractCluster) ctx.getTarget(), LiveCluster::new); - List invokers = cluster.route(request); - List instances = invokers.stream().map(e -> new SofaRpcEndpoint(e, cluster::isConnected)).collect(Collectors.toList()); SofaRpcOutboundRequest outboundRequest = new SofaRpcOutboundRequest(request, cluster); SofaRpcOutboundInvocation invocation = createOutlet(outboundRequest); - Response response = invokeWithRetry(invocation, () -> invoke(request, invocation, cluster, instances)); + Response response = invokeWithRetry(invocation, () -> invoke(request, invocation, cluster)); if (response.getThrowable() != null) { mc.setThrowable(response.getThrowable()); } else { @@ -96,16 +98,16 @@ public void onEnter(ExecutableContext ctx) { * @param request The {@link SofaRequest} representing the RPC call to be made. * @param invocation The {@link SofaRpcOutboundInvocation} context holding invocation details. * @param cluster The {@link LiveCluster} through which the request is to be executed. - * @param instances A list of {@link SofaRpcEndpoint} instances from which the endpoints are selected. * @return A {@link SofaRpcOutboundResponse} representing the outcome of the invocation. This response may * contain the result of the RPC call or an exception if the call was unsuccessful or no endpoints could be found. * @throws RejectException If the invocation is rejected by the routing logic or cluster configuration. */ private SofaRpcOutboundResponse invoke(SofaRequest request, SofaRpcOutboundInvocation invocation, - LiveCluster cluster, - List instances) { + LiveCluster cluster) { try { + List invokers = cluster.route(request); + List instances = invokers.stream().map(e -> new SofaRpcEndpoint(e, cluster::isConnected)).collect(Collectors.toList()); invocation.setInstances(instances); List endpoints = routing(invocation); if (endpoints != null && !endpoints.isEmpty()) { @@ -126,7 +128,7 @@ private SofaRpcOutboundResponse invoke(SofaRequest request, @Override protected SofaRpcOutboundInvocation createOutlet(SofaRpcOutboundRequest request) { - return new SofaRpcOutboundInvocation(request, context); + return new SofaRpcOutboundInvocation(request, new SofaRpcInvocationContext(context)); } /** @@ -156,4 +158,60 @@ private boolean isRetryable(Response response) { return false; } } + + /** + * A specialized {@link InvocationContextDelegate} designed for use within the SOFA RPC environment. + * This class overrides the {@code getLoadBalancer} method to return an instance of {@link SofaRpcLoadBalancer}, + * effectively customizing the load balancing strategy for SOFA RPC invocations. + * + *

The {@code SofaRpcInvocationContext} serves as an extension to the standard invocation context, providing + * a mechanism to utilize a custom load balancer that is specifically tailored for handling the nuances and + * requirements of load balancing in SOFA RPC services. This allows for enhanced control over service invocation + * and routing, potentially improving performance, reliability, and service discovery in distributed SOFA RPC + * environments.

+ * + * @see InvocationContextDelegate + * @see LoadBalancer + */ + private static class SofaRpcInvocationContext extends InvocationContext.InvocationContextDelegate { + + public SofaRpcInvocationContext(InvocationContext delegate) { + super(delegate); + } + + @Override + public LoadBalancer getOrDefaultLoadBalancer(String name) { + return new SofaRpcLoadBalancer(super.getOrDefaultLoadBalancer(name)); + } + } + + /** + * A specialized {@link LoadBalancer.LoadBalancerDelegate} designed for use within the SOFA RPC framework. + * This class overrides the {@code doSelect} method to add functionality for measuring the time taken + * to select an endpoint from a list of available endpoints. The selection time is then recorded in the + * {@link RpcInvokeContext} for monitoring, debugging, or other purposes. + * + *

The addition of timing logic allows for the observation and analysis of load balancing performance, + * potentially aiding in the optimization of service discovery and request routing within a distributed + * SOFA RPC environment. This class demonstrates a practical application of the Decorator pattern to enhance + * or modify the behavior of an existing load balancer with minimal impact on the existing infrastructure.

+ * + * @see LoadBalancer.LoadBalancerDelegate + */ + private static class SofaRpcLoadBalancer extends LoadBalancer.LoadBalancerDelegate { + + public SofaRpcLoadBalancer(LoadBalancer delegate) { + super(delegate); + } + + @Override + public T doSelect(List endpoints, Invocation invocation) { + long loadBalanceStartTime = System.nanoTime(); + T result = super.doSelect(endpoints, invocation); + RpcInvokeContext.getContext().put(RpcConstants.INTERNAL_KEY_CLIENT_BALANCER_TIME_NANO, + System.nanoTime() - loadBalanceStartTime); + return result; + + } + } }