Skip to content

Commit

Permalink
add sticky policy
Browse files Browse the repository at this point in the history
  • Loading branch information
hexiaofeng committed May 9, 2024
1 parent e7dab2f commit 3d23110
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,23 @@ default Predicate<Endpoint> getPredicate() {
return null;
}

/**
* Evaluates the predicate associated with this endpoint, if any, to determine
* if this endpoint satisfies the conditions defined by the predicate.
* <p>
* This method will return {@code true} if no predicate is set, implying that
* the endpoint is acceptable by default. If a predicate is set, the endpoint
* will be tested against it.
* </p>
*
* @return {@code true} if the predicate is {@code null} or if the predicate
* test passes for this endpoint; {@code false} otherwise.
*/
default boolean predicate() {
Predicate<Endpoint> predicate = getPredicate();
return predicate == null || predicate.test(this);
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@
@Extensible(value = "RouteFilter")
public interface RouteFilter {

int ORDER_VIRTUAL = 100;
int ORDER_RETRY = 100;

int ORDER_LIVE_UNIT = ORDER_VIRTUAL + 100;
int ORDER_STICKY = ORDER_RETRY + 100;

int ORDER_LOCALHOST = ORDER_LIVE_UNIT + 100;
int ORDER_LOCALHOST = ORDER_STICKY + 100;

int ORDER_RETRY = ORDER_LOCALHOST + 100;
int ORDER_HEALTH = ORDER_LOCALHOST + 100;

int ORDER_STICKY = ORDER_RETRY + 100;
int ORDER_VIRTUAL = ORDER_HEALTH + 100;

int ORDER_HEALTH = ORDER_STICKY + 100;
int ORDER_LIVE_UNIT = ORDER_VIRTUAL + 100;

int ORDER_TAG_ROUTE = ORDER_HEALTH + 100;
int ORDER_TAG_ROUTE = ORDER_LIVE_UNIT + 100;

int ORDER_LANE = ORDER_TAG_ROUTE + 100;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.jd.live.agent.core.inject.annotation.InjectLoader;
import com.jd.live.agent.core.inject.annotation.Injectable;
import com.jd.live.agent.governance.config.GovernanceConfig;
import com.jd.live.agent.governance.context.RequestContext;
import com.jd.live.agent.governance.instance.Endpoint;
import com.jd.live.agent.governance.invoke.OutboundInvocation;
import com.jd.live.agent.governance.invoke.RouteTarget;
Expand All @@ -31,13 +32,13 @@
import com.jd.live.agent.governance.invoke.loadbalance.randomweight.RandomWeightLoadBalancer;
import com.jd.live.agent.governance.policy.service.ServicePolicy;
import com.jd.live.agent.governance.policy.service.loadbalance.LoadBalancePolicy;
import com.jd.live.agent.governance.request.Request;
import com.jd.live.agent.governance.request.ServiceRequest;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

/**
* LoadBalanceFilter applies load balancing to the list of route targets. It ensures that
Expand All @@ -58,23 +59,27 @@ public class LoadBalanceFilter implements RouteFilter {
public <T extends ServiceRequest.OutboundRequest> void filter(OutboundInvocation<T> invocation, RouteFilterChain chain) {
RouteTarget target = invocation.getRouteTarget();
if (!target.isEmpty()) {
LoadBalancer loadBalancer = getLoadBalancer(invocation);
target.choose(endpoints -> {
List<? extends Endpoint> backends = endpoints;
do {
Endpoint backend = loadBalancer.choose(backends, invocation);
if (backend == null) {
return null;
}
Predicate<Endpoint> predicate = backend.getPredicate();
if (predicate == null || predicate.test(backend)) {
return Collections.singletonList(backend);
}
backends = backends == endpoints ? new ArrayList<>(endpoints) : backends;
backends.remove(backend);
} while (!backends.isEmpty());
return null;
});
List<? extends Endpoint> candidates = preferSticky(target);
if (candidates != null && !candidates.isEmpty()) {
target.setEndpoints(candidates);
} else {
LoadBalancer loadBalancer = getLoadBalancer(invocation);
target.choose(endpoints -> {
List<? extends Endpoint> backends = endpoints;
do {
Endpoint backend = loadBalancer.choose(backends, invocation);
if (backend == null) {
return null;
}
if (backend.predicate()) {
return Collections.singletonList(backend);
}
backends = backends == endpoints ? new ArrayList<>(endpoints) : backends;
backends.remove(backend);
} while (!backends.isEmpty());
return null;
});
}
}

T request = invocation.getRequest();
Expand All @@ -91,6 +96,31 @@ public <T extends ServiceRequest.OutboundRequest> void filter(OutboundInvocation
chain.filter(invocation);
}

/**
* Attempts to select an endpoint based on a sticky identifier from the provided route target.
* <p>
* This method retrieves a preferred sticky identifier from the request context and attempts
* to find an endpoint from the route target that matches this identifier. If a matching
* endpoint is found and it satisfies any associated predicate, it is returned. Otherwise,
* this method returns {@code null}, indicating no suitable sticky endpoint was found.
* </p>
*
* @param target The {@link RouteTarget} containing potential endpoints to stick to.
* @return A list containing the matched endpoint if one is found and satisfies the predicate;
* otherwise, {@code null}.
*/
private List<? extends Endpoint> preferSticky(RouteTarget target) {
// preferred sticky id
String id = RequestContext.removeAttribute(Request.KEY_STICKY_ID);
if (id != null) {
List<? extends Endpoint> backends = RouteTarget.filter(target.getEndpoints(), endpoint -> id.equals(endpoint.getId()), 1);
if (!backends.isEmpty() && backends.get(0).predicate()) {
return backends;
}
}
return null;
}

/**
* Retrieves the appropriate load balancer based on the service policy of the current invocation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import com.jd.live.agent.governance.invoke.RouteTarget;
import com.jd.live.agent.governance.invoke.filter.RouteFilter;
import com.jd.live.agent.governance.invoke.filter.RouteFilterChain;
import com.jd.live.agent.governance.policy.service.ServicePolicy;
import com.jd.live.agent.governance.policy.service.loadbalance.LoadBalancePolicy;
import com.jd.live.agent.governance.policy.service.loadbalance.StickyType;
import com.jd.live.agent.governance.request.Request;
import com.jd.live.agent.governance.request.ServiceRequest.OutboundRequest;

Expand All @@ -38,15 +41,27 @@ public class StickyFilter implements RouteFilter {

@Override
public <T extends OutboundRequest> void filter(OutboundInvocation<T> invocation, RouteFilterChain chain) {
RouteTarget target = invocation.getRouteTarget();
// Get the sticky ID from the request, if available
String id = invocation.getRequest().getStickyId();
// first remove sticky id from context
String ctxId = RequestContext.removeAttribute(Request.KEY_STICKY_ID);
final String stickyId = id != null && !id.isEmpty() ? id : ctxId;
// If a sticky ID is available, filter the targets to only include the one with the sticky ID
if (stickyId != null && !stickyId.isEmpty()) {
target.filter(endpoint -> stickyId.equals(endpoint.getId()), 1);
ServicePolicy servicePolicy = invocation.getServiceMetadata().getServicePolicy();
LoadBalancePolicy loadBalancePolicy = servicePolicy == null ? null : servicePolicy.getLoadBalancePolicy();
StickyType stickyType = loadBalancePolicy == null ? StickyType.NONE : loadBalancePolicy.getStickyType();
stickyType = stickyType == null ? StickyType.NONE : stickyType;
if (stickyType != StickyType.NONE) {
RouteTarget target = invocation.getRouteTarget();
// Get the sticky ID from the request, if available
String id = invocation.getRequest().getStickyId();
// first remove sticky id from context
String ctxId = RequestContext.removeAttribute(Request.KEY_STICKY_ID);
final String stickyId = id != null && !id.isEmpty() ? id : ctxId;
// If a sticky ID is available, filter the targets to only include the one with the sticky ID
if (stickyId != null && !stickyId.isEmpty()) {
if (stickyType == StickyType.FIXED) {
target.filter(endpoint -> stickyId.equals(endpoint.getId()), 1);
} else {
RequestContext.setAttribute(Request.KEY_STICKY_ID, stickyId);
}
}
} else {
RequestContext.removeAttribute(Request.KEY_STICKY_ID);
}
chain.filter(invocation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class VirtualFilter implements RouteFilter {
@Override
public <T extends OutboundRequest> void filter(OutboundInvocation<T> invocation, RouteFilterChain chain) {
List<? extends Endpoint> instances = invocation.getInstances();
if (instances != null && size > 0 && instances.size() < size) {
if (size > 0 && instances != null && !instances.isEmpty() && instances.size() < size) {
List<Endpoint> result = new ArrayList<>(size);
result.addAll(instances);
int remain = size - instances.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public class LoadBalancePolicy implements PolicyInheritWithId<LoadBalancePolicy>
*/
private String policyType;

/**
* sticky type
*/
private StickyType stickyType = StickyType.NONE;

/**
* Constructs a new, empty {@code LoadBalancePolicy}.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.jd.live.agent.governance.policy.service.loadbalance;

/**
* Defines the stickiness type for load balancing or session handling.
*/
public enum StickyType {

/**
* No stickiness. Each request may be handled by any available provider
* without preference.
*/
NONE,

/**
* Preferred stickiness. Requests prefer to be handled by a previously
* used provider but may fall back to others if necessary.
*/
PREFERRED,

/**
* Fixed stickiness. Requests are strictly bound to a specific provider
* once they have used it for the first time.
*/
FIXED
}

0 comments on commit 3d23110

Please sign in to comment.