Skip to content

Commit

Permalink
IGNITE-23578 Fix removal of required affinity assignment instance fro…
Browse files Browse the repository at this point in the history
…m affinity cache - Fixes #11633.

Signed-off-by: Aleksey Plekhanov <plehanov.alex@gmail.com>
  • Loading branch information
alex-plekhanov committed Nov 12, 2024
1 parent 67e2831 commit 289280f
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
Expand All @@ -55,6 +54,7 @@
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD;
import static org.apache.ignite.IgniteSystemProperties.getFloat;
Expand All @@ -76,23 +76,23 @@ public class GridAffinityAssignmentCache {
* Affinity cache will shrink when total number of non-shallow (see {@link HistoryAffinityAssignmentImpl})
* historical instances will be greater than value of this constant.
*/
private final int MAX_NON_SHALLOW_HIST_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, DFLT_AFFINITY_HISTORY_SIZE);
final int maxNonShallowHistSize = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, DFLT_AFFINITY_HISTORY_SIZE);

/**
* Affinity cache will also shrink when total number of both shallow ({@link HistoryAffinityAssignmentShallowCopy})
* and non-shallow (see {@link HistoryAffinityAssignmentImpl}) historical instances will be greater than
* value of this constant.
*/
private final int MAX_TOTAL_HIST_SIZE = MAX_NON_SHALLOW_HIST_SIZE * 10;
final int maxTotalHistSize = maxNonShallowHistSize * 10;

/**
* Independent of {@link #MAX_NON_SHALLOW_HIST_SIZE} and {@link #MAX_TOTAL_HIST_SIZE}, affinity cache will always
* Independent of {@link #maxNonShallowHistSize} and {@link #maxTotalHistSize}, affinity cache will always
* keep this number of non-shallow (see {@link HistoryAffinityAssignmentImpl}) instances.
* We need at least one real instance, otherwise we won't be able to get affinity cache for
* {@link GridCachePartitionExchangeManager#lastAffinityChangedTopologyVersion} in case cluster has experienced
* too many client joins / client leaves / local cache starts.
*/
private final int MIN_NON_SHALLOW_HIST_SIZE = 2;
private static final int MIN_NON_SHALLOW_HIST_SIZE = 2;

/** Partition distribution. */
private final float partDistribution =
Expand Down Expand Up @@ -144,7 +144,7 @@ public class GridAffinityAssignmentCache {
private volatile IgniteCheckedException stopErr;

/** Number of non-shallow (see {@link HistoryAffinityAssignmentImpl}) affinity cache instances. */
private final AtomicInteger nonShallowHistSize = new AtomicInteger();
private volatile int nonShallowHistSize;

/** */
private final Object similarAffKey;
Expand Down Expand Up @@ -311,7 +311,7 @@ public void onReconnected() {

affCache.clear();

nonShallowHistSize.set(0);
nonShallowHistSize = 0;

head.set(new GridAffinityAssignmentV2(AffinityTopologyVersion.NONE));

Expand Down Expand Up @@ -575,7 +575,7 @@ public void clientEventTopologyChange(DiscoveryEvent evt, AffinityTopologyVersio

Map.Entry<AffinityTopologyVersion, HistoryAffinityAssignment> prevHistEntry = affCache.floorEntry(prevVer);

HistoryAffinityAssignment newHistEntry = (prevHistEntry == null || shouldCleanupShallows()) ?
HistoryAffinityAssignment newHistEntry = (prevHistEntry == null) ?
new HistoryAffinityAssignmentImpl(assignmentCpy, backups) :
new HistoryAffinityAssignmentShallowCopy(prevHistEntry.getValue().origin(), topVer);

Expand Down Expand Up @@ -759,7 +759,7 @@ public AffinityAssignment readyAffinity(AffinityTopologyVersion topVer) {
", topVer=" + topVer +
", head=" + head.get().topologyVersion() +
", history=" + affCache.keySet() +
", maxNonShallowHistorySize=" + MAX_NON_SHALLOW_HIST_SIZE +
", maxNonShallowHistorySize=" + maxNonShallowHistSize +
']');
}
}
Expand Down Expand Up @@ -826,7 +826,7 @@ public AffinityAssignment cachedAffinity(
", lastAffChangeTopVer=" + lastAffChangeTopVer +
", head=" + head.get().topologyVersion() +
", history=" + affCache.keySet() +
", maxNonShallowHistorySize=" + MAX_NON_SHALLOW_HIST_SIZE +
", maxNonShallowHistorySize=" + maxNonShallowHistSize +
']');
}

Expand All @@ -839,7 +839,7 @@ public AffinityAssignment cachedAffinity(
", lastAffChangeTopVer=" + lastAffChangeTopVer +
", head=" + head.get().topologyVersion() +
", history=" + affCache.keySet() +
", maxNonShallowHistorySize=" + MAX_NON_SHALLOW_HIST_SIZE +
", maxNonShallowHistorySize=" + maxNonShallowHistSize +
']');
}
}
Expand Down Expand Up @@ -942,67 +942,52 @@ private void awaitTopologyVersion(AffinityTopologyVersion topVer) {
* @param replaced Replaced entry in case history item was already present, null otherwise.
* @param added New history item.
*/
private void onHistoryAdded(
private synchronized void onHistoryAdded(
HistoryAffinityAssignment replaced,
HistoryAffinityAssignment added
) {
boolean cleanupNeeded = false;

if (replaced == null) {
cleanupNeeded = true;

if (added.requiresHistoryCleanup())
nonShallowHistSize.incrementAndGet();
if (added.isFullSizeInstance())
nonShallowHistSize++;
}
else {
if (replaced.requiresHistoryCleanup() != added.requiresHistoryCleanup()) {
if (added.requiresHistoryCleanup()) {
cleanupNeeded = true;
else if (replaced.isFullSizeInstance() != added.isFullSizeInstance())
nonShallowHistSize += added.isFullSizeInstance() ? 1 : -1;

nonShallowHistSize.incrementAndGet();
}
else
nonShallowHistSize.decrementAndGet();
}
}
int totalSize = affCache.size();

if (!cleanupNeeded)
if (!shouldContinueCleanup(nonShallowHistSize, totalSize))
return;

int nonShallowSize = nonShallowHistSize.get();
AffinityTopologyVersion lastAffChangeTopVer =
ctx.cache().context().exchange().lastAffinityChangedTopologyVersion(head.get().topologyVersion());

int totalSize = affCache.size();
HistoryAffinityAssignment aff0 = null;

if (shouldContinueCleanup(nonShallowSize, totalSize)) {
int initNonShallowSize = nonShallowSize;
Iterator<HistoryAffinityAssignment> it = affCache.values().iterator();

Iterator<HistoryAffinityAssignment> it = affCache.values().iterator();
while (it.hasNext() && shouldContinueCleanup(nonShallowHistSize, totalSize)) {
aff0 = it.next();

while (it.hasNext()) {
HistoryAffinityAssignment aff0 = it.next();
if (aff0.topologyVersion().equals(lastAffChangeTopVer))
continue; // Keep lastAffinityChangedTopologyVersion, it's required for some operations.

if (aff0.requiresHistoryCleanup()) {
// We can stop cleanup only on non-shallow item.
// Keeping part of shallow items chain if corresponding real item is missing makes no sense.
if (!shouldContinueCleanup(nonShallowSize, totalSize)) {
nonShallowHistSize.getAndAdd(nonShallowSize - initNonShallowSize);
if (aff0.isFullSizeInstance()) {
if (nonShallowHistSize <= MIN_NON_SHALLOW_HIST_SIZE)
continue;

// GridAffinityProcessor#affMap has the same size and instance set as #affCache.
ctx.affinity().removeCachedAffinity(aff0.topologyVersion());
nonShallowHistSize--;
}

return;
}
totalSize--;

nonShallowSize--;
}
it.remove();
}

totalSize--;
assert aff0 != null;

it.remove();
}
ctx.affinity().removeCachedAffinity(aff0.topologyVersion());

assert false : "All elements have been removed from affinity cache during cleanup";
}
assert it.hasNext() : "All elements have been removed from affinity cache during cleanup";
}

/**
Expand All @@ -1013,15 +998,7 @@ private void onHistoryAdded(
* @return <code>true</code> if affinity cache cleanup is not finished yet.
*/
private boolean shouldContinueCleanup(int nonShallowSize, int totalSize) {
if (nonShallowSize <= MIN_NON_SHALLOW_HIST_SIZE)
return false;

return nonShallowSize > MAX_NON_SHALLOW_HIST_SIZE || totalSize > MAX_TOTAL_HIST_SIZE;
}

/** */
private boolean shouldCleanupShallows() {
return nonShallowHistSize.get() <= MIN_NON_SHALLOW_HIST_SIZE && affCache.size() > MAX_TOTAL_HIST_SIZE;
return nonShallowSize > maxNonShallowHistSize || totalSize > maxTotalHistSize;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public interface HistoryAffinityAssignment extends AffinityAssignment {
*
* @return <code>true</code> if adding this instance to history should trigger size check and possible cleanup.
*/
public boolean requiresHistoryCleanup();
public boolean isFullSizeInstance();

/**
* In case this instance is lightweight wrapper of another instance, this method should return reference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ private List<ClusterNode> partitionNodes(int p, boolean ideal, int cpys) {
}

/** {@inheritDoc} */
@Override public boolean requiresHistoryCleanup() {
@Override public boolean isFullSizeInstance() {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public HistoryAffinityAssignmentShallowCopy(
}

/** {@inheritDoc} */
@Override public boolean requiresHistoryCleanup() {
@Override public boolean isFullSizeInstance() {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.ignite.cache.affinity;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
Expand Down Expand Up @@ -175,12 +175,67 @@ public void testAffinityHistoryCleanup() throws Exception {
startGrid(4);

checkHistory(ignite, F.asList(
topVer(5, 0), // Client event -> FullHistSize = 5.
topVer(6, 0), // Client event -> FullHistSize = 5.
topVer(7, 0), // Client event -> FullHistSize = 5.
topVer(8, 0), // Client event -> FullHistSize = 5.
topVer(9, 0), // Client event -> FullHistSize = 5.
topVer(10, 0), // Client event -> FullHistSize = 5.
topVer(11, 0), // FullHistSize = 5.
topVer(11, 1), // FullHistSize = 5.
topVer(12, 0), // FullHistSize = 6 - 1 = 5.
topVer(13, 0), // FullHistSize = 5.
topVer(13, 1)), // FullHistSize = 6 - 1 = 5.
5);

stopGrid(4);

checkHistory(ignite, F.asList(
topVer(11, 1), // FullHistSize = 5.
topVer(12, 0), // FullHistSize = 6 - 1 = 5.
topVer(13, 0), // FullHistSize = 5.
topVer(13, 1), // FullHistSize = 6 - 1 = 5.
topVer(14, 0)), // FullHistSize = 6 - 1 = 5.
5);
}

/**
* @throws Exception If failed.
*/
@Test
@WithSystemProperty(key = IGNITE_AFFINITY_HISTORY_SIZE, value = "2")
public void testAffinityShallowHistoryCleanup() throws Exception {
Ignite ignite = startGrid(0);

checkHistory(ignite, F.asList(topVer(1, 0)), 1);

for (int i = 0; i < 15; i++) {
startClientGrid(1);
stopGrid(1);
}

List<AffinityTopologyVersion> expVers = new ArrayList<>();
expVers.add(topVer(1, 0));

for (int i = 13; i <= 31; i++)
expVers.add(topVer(i, 0));

checkHistory(ignite, expVers, 1);

startGrid(1);

expVers.clear();

for (int i = 14; i <= 32; i++)
expVers.add(topVer(i, 0));

expVers.add(topVer(32, 1));

checkHistory(ignite, expVers, 2);

stopGrid(1);

checkHistory(ignite, F.asList(topVer(32, 1), topVer(33, 0)), 2);
}

/**
Expand All @@ -199,9 +254,9 @@ private void checkHistory(Ignite ignite, List<AffinityTopologyVersion> expHist,
for (GridCacheContext cctx : proc.context().cacheContexts()) {
GridAffinityAssignmentCache aff = GridTestUtils.getFieldValue(cctx.affinity(), "aff");

AtomicInteger fullHistSize = GridTestUtils.getFieldValue(aff, "nonShallowHistSize");
int fullHistSize = GridTestUtils.getFieldValue(aff, "nonShallowHistSize");

assertEquals(expSize, fullHistSize.get());
assertEquals(expSize, fullHistSize);

Map<AffinityTopologyVersion, Object> cache = GridTestUtils.getFieldValue(aff, "affCache");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.util.typedef.internal.CU;
Expand Down Expand Up @@ -157,4 +159,40 @@ public void testAffinityCacheSizeOnReconnect() throws Exception {
}
}
}

/** */
@Test
public void testAffinityCacheMaxSize() throws Exception {
try (IgniteEx server = startGrids(2)) {
server.getOrCreateCache(DEFAULT_CACHE_NAME);

CacheGroupContext gctx = server.context().cache().cache(DEFAULT_CACHE_NAME).context().group();
Set<AffinityTopologyVersion> affCache = gctx.affinity().cachedVersions();
int maxTotalHistSize = gctx.affinity().maxTotalHistSize;
int maxNonShallowHistSize = gctx.affinity().maxNonShallowHistSize;

for (int i = 0; i < (maxTotalHistSize - 10) / 2; i++) {
server.createCache(DEFAULT_CACHE_NAME + i);
server.destroyCache(DEFAULT_CACHE_NAME + i);
}

startClientGridsMultiThreaded(2, 10);

assertEquals(maxTotalHistSize, affCache.size());

for (int i = 0; i < maxTotalHistSize / 2; i++) {
server.createCache(DEFAULT_CACHE_NAME + i);
server.destroyCache(DEFAULT_CACHE_NAME + i);
}

assertEquals(maxTotalHistSize, affCache.size());

for (int i = 0; i < maxNonShallowHistSize / 2 + 1; i++) {
stopGrid(1);
startGrid(1);
}

assertEquals(maxNonShallowHistSize, affCache.size());
}
}
}
Loading

0 comments on commit 289280f

Please sign in to comment.