Skip to content

Commit

Permalink
Merge pull request #19 from chenzhiguo/fix
Browse files Browse the repository at this point in the history
Fix #16
  • Loading branch information
hexiaofeng authored Aug 23, 2024
2 parents 682ce7b + 604e743 commit a3d7eb6
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public class EndpointGroup {
* A map that associates unit strings with UnitGroup objects. Each UnitGroup contains
* a collection of Endpoint objects that share the same unit value.
*/
private final Map<String, UnitGroup> units;
@Getter
private final Map<String, UnitGroup> unitGroups;

/**
* Constructs a new EndpointGroup with the specified list of endpoints. The endpoints
Expand All @@ -54,14 +55,14 @@ public class EndpointGroup {
@SuppressWarnings("unchecked")
public EndpointGroup(List<? extends Endpoint> endpoints) {
this.endpoints = endpoints == null || endpoints.isEmpty() ? new ArrayList<>() : (List<Endpoint>) endpoints;
this.units = new HashMap<>(3);
this.unitGroups = new HashMap<>(3);
UnitGroup last = null;
String unit;
for (Endpoint endpoint : this.endpoints) {
unit = endpoint.getUnit();
unit = (unit == null) ? Constants.DEFAULT_VALUE : unit;
if (last == null || !last.getUnit().equals(unit)) {
last = units.computeIfAbsent(unit, UnitGroup::new);
last = unitGroups.computeIfAbsent(unit, UnitGroup::new);
}
last.add(endpoint);
}
Expand All @@ -75,7 +76,7 @@ public EndpointGroup(List<? extends Endpoint> endpoints) {
* @return the UnitGroup for the specified unit, or null if not found
*/
public UnitGroup getUnitGroup(String unit) {
return (unit == null) ? null : units.get(unit);
return (unit == null) ? null : unitGroups.get(unit);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public class UnitGroup {
* A map that associates cell strings with CellGroup objects. Each CellGroup contains a collection
* of Endpoint objects that share the same cell value within this unit.
*/
private final Map<String, CellGroup> cells;
@Getter
private final Map<String, CellGroup> cellGroups;

private CellGroup lastCellGroup;

Expand All @@ -55,7 +56,7 @@ public class UnitGroup {
*/
public UnitGroup(String unit) {
this.unit = Objects.requireNonNull(unit, "Unit cannot be null");
this.cells = new HashMap<>(5);
this.cellGroups = new HashMap<>(5);
}

/**
Expand All @@ -81,7 +82,7 @@ public void add(Endpoint endpoint) {
endpoints.add(endpoint);
String cell = endpoint.getCell();
if (lastCellGroup == null || !lastCellGroup.getCell().equals(cell)) {
lastCellGroup = cells.computeIfAbsent(cell, c -> new CellGroup(unit, c));
lastCellGroup = cellGroups.computeIfAbsent(cell, c -> new CellGroup(unit, c));
}
lastCellGroup.add(endpoint);
}
Expand All @@ -94,16 +95,16 @@ public void add(Endpoint endpoint) {
* @return the CellGroup for the specified cell, or null if not found
*/
public CellGroup getCell(String cell) {
return cell == null ? null : cells.get(cell);
return cell == null ? null : cellGroups.get(cell);
}

/**
* Returns the total number of cell groups within this unit group.
*
* @return the size of the cells map
* @return the size of the cellGroups map
*/
public int getCells() {
return cells.size();
return cellGroups.size();
}

/**
Expand Down Expand Up @@ -134,7 +135,7 @@ public Integer getSize(String cell) {
if (cell == null) {
return null;
} else {
CellGroup group = cells.get(cell);
CellGroup group = cellGroups.get(cell);
return group == null ? null : group.size();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class RouteTarget {
/**
* The group of endpoints that this route target is associated with.
*/
@Getter
private final EndpointGroup instanceGroup;

/**
Expand Down Expand Up @@ -99,7 +100,7 @@ public RouteTarget(List<? extends Endpoint> instances, EndpointGroup instanceGro
Unit unit, UnitAction unitAction, UnitRoute unitRoute, CellRoute cellRoute) {
this.instances = instances == null && instanceGroup != null ? instanceGroup.getEndpoints() : instances;
this.unit = unit == null && unitRoute != null ? unitRoute.getUnit() : unit;
this.instanceGroup = instanceGroup == null && instances != null && this.unit != null ? new EndpointGroup(instances) : instanceGroup;
this.instanceGroup = instanceGroup == null && instances != null ? new EndpointGroup(instances) : instanceGroup;
this.unitGroup = this.instanceGroup == null || this.unit == null ? null : this.instanceGroup.getUnitGroup(this.unit.getCode());
this.unitAction = unitAction;
this.unitRoute = unitRoute;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@
import lombok.Getter;
import lombok.Setter;

import java.util.ArrayList;
import java.util.List;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;

Expand Down Expand Up @@ -67,7 +66,7 @@ public <T extends OutboundRequest> void filter(OutboundInvocation<T> invocation,
* Forwards an OutboundInvocation to a specific RouteTarget based on various service policies and configurations.
*
* @param invocation The OutboundInvocation to be forwarded.
* @param target The RouteTarget where the invocation should be directed.
* @param target The RouteTarget where the invocation should be directed.
* @return true if the routing decision was successful and endpoints were set, false otherwise.
*/
private boolean forward(OutboundInvocation<?> invocation, RouteTarget target) {
Expand All @@ -76,6 +75,32 @@ private boolean forward(OutboundInvocation<?> invocation, RouteTarget target) {
ServiceConfig serviceConfig = serviceMetadata.getServiceConfig();
ServiceLivePolicy livePolicy = serviceMetadata.getServiceLivePolicy();
CellPolicy cellPolicy = livePolicy == null ? null : livePolicy.getCellPolicy();

// The service does not participate in cell traffic scheduling but needs to exclude disabled cells.
if (cellPolicy == null && target.getInstanceGroup().getUnitGroups() != null) {
Set<String> unavailableCells = new HashSet<>();
for (Map.Entry<String, UnitGroup> unitGroupEntry : target.getInstanceGroup().getUnitGroups().entrySet()) {
UnitGroup unitGroup = unitGroupEntry.getValue();
LiveSpace liveSpace = liveMetadata.getLiveSpace();
Unit unit = liveSpace.getUnit(unitGroupEntry.getKey());
if (unitGroup.getCellGroups() == null) {
continue;
}
if (!invocation.isAccessible(unit)) {
unavailableCells.addAll(unitGroup.getCellGroups().keySet());
} else {
for (Map.Entry<String, CellGroup> cellGroupEntry : unitGroup.getCellGroups().entrySet()) {
Cell cell = unit.getCell(cellGroupEntry.getKey());
if (!invocation.isAccessible(cell)) {
unavailableCells.add(cellGroupEntry.getKey());
}
}
}
}
target.filter(endpoint -> !unavailableCells.contains(endpoint.getCell()));
return true;
}

boolean localFirst = cellPolicy == CellPolicy.PREFER_LOCAL_CELL || cellPolicy == null && serviceConfig.isLocalFirst();
Function<String, Integer> thresholdFunc = !localFirst ? null : (
cellPolicy == CellPolicy.PREFER_LOCAL_CELL
Expand All @@ -85,10 +110,11 @@ private boolean forward(OutboundInvocation<?> invocation, RouteTarget target) {
if (unit == null) {
// unit policy is none.
if (localFirst && !target.isEmpty()) {
filterLocal(target, liveMetadata, thresholdFunc);
return filterLocal(invocation, target, liveMetadata, thresholdFunc);
}
return true;
}

UnitGroup unitGroup = target.getUnitGroup();
// previous filters may filtrate the endpoints
unitGroup = unitGroup != null && unitGroup.getEndpoints() == target.getEndpoints() && unitGroup.size() == target.size()
Expand Down Expand Up @@ -117,17 +143,23 @@ private boolean forward(OutboundInvocation<?> invocation, RouteTarget target) {
/**
* Filters the endpoints in the given {@code RouteTarget} based on their locality to the current unit and cell.
*
* @param invocation The outbound invocation containing metadata for the election.
* @param target The {@code RouteTarget} containing the endpoints to be filtered. If empty or if localFirst is false, no filtering is applied.
* @param liveMetadata The live metadata providing information about the current unit and cell.
* @param thresholdFunc A function that returns a threshold value for the current cell. If the number of local endpoints exceeds this threshold,
* those endpoints are preferred.
* @return true if the routing decision was successful and endpoints were set, false otherwise.
*/
private void filterLocal(RouteTarget target,
LiveMetadata liveMetadata,
Function<String, Integer> thresholdFunc) {
private boolean filterLocal(OutboundInvocation<?> invocation,
RouteTarget target,
LiveMetadata liveMetadata,
Function<String, Integer> thresholdFunc) {
String liveSpaceId = liveMetadata.getLiveSpaceId();
Unit currentUnit = liveMetadata.getCurrentUnit();
Cell currentCell = liveMetadata.getCurrentCell();
if (!invocation.isAccessible(currentUnit) || !invocation.isAccessible(currentCell)) {
return false;
}
if (liveSpaceId != null && currentUnit != null && currentCell != null) {
List<Endpoint> cellEndpoints = new ArrayList<>();
List<Endpoint> unitEndpoints = new ArrayList<>();
Expand All @@ -146,6 +178,7 @@ private void filterLocal(RouteTarget target,
target.setEndpoints(unitEndpoints);
}
}
return true;
}

/**
Expand All @@ -154,10 +187,10 @@ private void filterLocal(RouteTarget target,
* candidates for the election based on the cell's weight, priority, and instance count.
* The method also considers local preference and failover thresholds.
*
* @param invocation The outbound invocation containing metadata for the election.
* @param unitRoute The route containing cells to be considered as candidates.
* @param localFirst A boolean indicating whether local cells should be preferred.
* @param unitGroup The group from which to retrieve the size of instances for each cell.
* @param invocation The outbound invocation containing metadata for the election.
* @param unitRoute The route containing cells to be considered as candidates.
* @param localFirst A boolean indicating whether local cells should be preferred.
* @param unitGroup The group from which to retrieve the size of instances for each cell.
* @param failoverThresholdFunc A function that provides the failover threshold for each cell.
* @return An Election object representing the sponsored election.
*/
Expand Down Expand Up @@ -353,14 +386,14 @@ private static class Election {
/**
* Constructs a new Election with the provided candidates and election parameters.
*
* @param candidates The list of candidates participating in the election.
* @param weights The total weight of all candidates.
* @param instances The total number of instances across all candidates.
* @param winner The current winner of the election.
* @param candidates The list of candidates participating in the election.
* @param weights The total weight of all candidates.
* @param instances The total number of instances across all candidates.
* @param winner The current winner of the election.
* @param failoverThresholdFunc A function for determining failover thresholds.
*/
Election(List<Candidate> candidates, int weights, int instances, Candidate winner,
Function<String, Integer> failoverThresholdFunc) {
Function<String, Integer> failoverThresholdFunc) {
this.candidates = candidates;
this.weights = weights;
this.instances = instances;
Expand Down
Loading

0 comments on commit a3d7eb6

Please sign in to comment.