Skip to content

Commit

Permalink
IGNITE-20462 Fix idle_verify hash conflicts for expiring entries - Fixes
Browse files Browse the repository at this point in the history
 #10947.

Signed-off-by: Aleksey Plekhanov <plehanov.alex@gmail.com>
  • Loading branch information
alex-plekhanov committed Sep 26, 2023
1 parent 1ae2edc commit 158bd2c
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
Expand All @@ -52,6 +53,9 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry;
Expand Down Expand Up @@ -2393,6 +2397,38 @@ public void testCacheIdleVerifyMovingParts() throws Exception {
assertContains(log, testOut.toString(), "MOVING partitions");
}

/**
* @throws Exception If failed.
*/
@Test
public void testCacheIdleVerifyExpiringEntries() throws Exception {
IgniteEx ignite = startGrids(3);

ignite.cluster().state(ACTIVE);

IgniteCache<Object, Object> cache = ignite.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
.setAffinity(new RendezvousAffinityFunction(false, 32))
.setBackups(1));

Random rnd = new Random();

// Put without expiry policy.
for (int i = 0; i < 5_000; i++)
cache.put(i, i);

// Put with expiry policy.
for (int i = 5_000; i < 10_000; i++) {
ExpiryPolicy expPol = new CreatedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, rnd.nextInt(1_000)));
cache.withExpiryPolicy(expPol).put(i, i);
}

injectTestSystemOut();

assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));

assertContains(log, testOut.toString(), "no conflicts have been found");
}

/** */
@Test
public void testCacheSequence() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public SnapshotPartitionsQuickVerifyHandler(GridCacheSharedContext<?, ?> cctx) {
if (other == null)
return;

if (val.size() != other.size() || !Objects.equals(val.updateCounter(), other.updateCounter()))
if ((!val.hasExpiringEntries() && !other.hasExpiringEntries() && val.size() != other.size())
|| !Objects.equals(val.updateCounter(), other.updateCounter()))
wrnGrps.add(part.groupId());
}));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,15 @@
import org.apache.ignite.internal.managers.encryption.EncryptionCacheKeyProvider;
import org.apache.ignite.internal.managers.encryption.GroupKey;
import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
Expand Down Expand Up @@ -102,7 +105,7 @@ public SnapshotPartitionsVerifyHandler(GridCacheSharedContext<?, ?> cctx) {
/** {@inheritDoc} */
@Override public Map<PartitionKeyV2, PartitionHashRecordV2> invoke(SnapshotHandlerContext opCtx) throws IgniteCheckedException {
if (!opCtx.snapshotDirectory().exists())
throw new IgniteCheckedException("Snapshot directory doesn't exists: " + opCtx.snapshotDirectory());;
throw new IgniteCheckedException("Snapshot directory doesn't exists: " + opCtx.snapshotDirectory());

SnapshotMetadata meta = opCtx.metadata();

Expand Down Expand Up @@ -271,6 +274,10 @@ public SnapshotPartitionsVerifyHandler(GridCacheSharedContext<?, ?> cctx) {

assert hash != null : "OWNING must have hash: " + key;

// We should skip size comparison if there are entries to expire exist.
if (hasExpiringEntries(snpCtx, pageStore, pageBuff, io.getPendingTreeRoot(pageAddr)))
hash.hasExpiringEntries(true);

res.put(key, hash);
}
catch (IOException e) {
Expand All @@ -294,6 +301,40 @@ public SnapshotPartitionsVerifyHandler(GridCacheSharedContext<?, ?> cctx) {
return res;
}

/** */
private boolean hasExpiringEntries(
GridKernalContext ctx,
PageStore pageStore,
ByteBuffer pageBuff,
long pendingTreeMetaId
) throws IgniteCheckedException {
if (pendingTreeMetaId == 0)
return false;

long pageAddr = GridUnsafe.bufferAddress(pageBuff);

pageBuff.clear();
pageStore.read(pendingTreeMetaId, pageBuff, true);

if (PageIO.getCompressionType(pageBuff) != CompressionProcessor.UNCOMPRESSED_PAGE)
ctx.compress().decompressPage(pageBuff, pageStore.getPageSize());

BPlusMetaIO treeIO = BPlusMetaIO.VERSIONS.forPage(pageAddr);

int rootLvl = treeIO.getRootLevel(pageAddr);
long rootId = treeIO.getFirstPageId(pageAddr, rootLvl);

pageBuff.clear();
pageStore.read(rootId, pageBuff, true);

if (PageIO.getCompressionType(pageBuff) != CompressionProcessor.UNCOMPRESSED_PAGE)
ctx.compress().decompressPage(pageBuff, pageStore.getPageSize());

BPlusIO<?> rootIO = PageIO.getPageIO(pageBuff);

return rootIO.getCount(pageAddr) != 0;
}

/** {@inheritDoc} */
@Override public void complete(String name,
Collection<SnapshotHandlerResult<Map<PartitionKeyV2, PartitionHashRecordV2>>> results) throws IgniteCheckedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ public static List<Integer> compareUpdateCounters(
while (it.hasNextX()) {
CacheDataRow row = it.nextX();

if (row.expireTime() > 0)
continue;

partHash += row.key().hashCode();
partVerHash += row.version().hashCode(); // Detects ABA problem.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
@GridToStringExclude
private int regKeys;

/** If partition has entries to expire. */
@GridToStringExclude
private boolean hasExpiringEntries;

/**
* @param partKey Partition key.
* @param isPrimary Is primary.
Expand Down Expand Up @@ -219,6 +223,16 @@ public int regularKeys() {
return regKeys;
}

/** */
public boolean hasExpiringEntries() {
return hasExpiringEntries;
}

/** */
public void hasExpiringEntries(boolean hasExpiringEntries) {
this.hasExpiringEntries = hasExpiringEntries;
}

/** {@inheritDoc} */
@Override protected void writeExternalData(ObjectOutput out) throws IOException {
out.writeObject(partKey);
Expand All @@ -233,6 +247,7 @@ public int regularKeys() {
out.writeInt(noCfKeys);
out.writeInt(binKeys);
out.writeInt(regKeys);
out.writeBoolean(hasExpiringEntries);
}

/** {@inheritDoc} */
Expand All @@ -255,6 +270,7 @@ public int regularKeys() {
noCfKeys = in.readInt();
binKeys = in.readInt();
regKeys = in.readInt();
hasExpiringEntries = in.readBoolean();
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
Expand Down Expand Up @@ -600,6 +604,32 @@ public void testClusterSnapshotCheckMultipleTimes() throws Exception {
assertTrue("Threads created: " + createdThreads, createdThreads < iterations);
}

/** */
@Test
public void testClusterSnapshotCheckWithExpiring() throws Exception {
IgniteEx ignite = startGrids(3);

ignite.cluster().state(ACTIVE);

IgniteCache<Object, Object> cache = ignite.getOrCreateCache(new CacheConfiguration<>("expCache")
.setAffinity(new RendezvousAffinityFunction(false, 32)).setBackups(1));

Random rnd = new Random();

for (int i = 0; i < 10_000; i++) {
cache.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS,
rnd.nextInt(10_000)))).put(i, i);
}

long timeout = getTestTimeout();

snp(ignite).createSnapshot(SNAPSHOT_NAME).get(timeout);

SnapshotPartitionsVerifyTaskResult res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get(timeout);

assertFalse(res.idleVerifyResult().hasConflicts());
}

/**
* @param cls Class of running task.
* @param results Results of compute.
Expand Down

0 comments on commit 158bd2c

Please sign in to comment.