From 158bd2c0d6546d01e55e541dc1b9a3156713d183 Mon Sep 17 00:00:00 2001 From: Aleksey Plekhanov Date: Tue, 26 Sep 2023 14:16:39 +0300 Subject: [PATCH] IGNITE-20462 Fix idle_verify hash conflicts for expiring entries - Fixes #10947. Signed-off-by: Aleksey Plekhanov --- .../ignite/util/GridCommandHandlerTest.java | 36 ++++++++++++++++ .../SnapshotPartitionsQuickVerifyHandler.java | 3 +- .../SnapshotPartitionsVerifyHandler.java | 43 ++++++++++++++++++- .../cache/verify/IdleVerifyUtility.java | 3 ++ .../cache/verify/PartitionHashRecordV2.java | 16 +++++++ .../IgniteClusterSnapshotCheckTest.java | 30 +++++++++++++ 6 files changed, 129 insertions(+), 2 deletions(-) diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java index abb7a50c4fb58..6a40bfb738ea3 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; @@ -52,6 +53,9 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; +import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorException; import javax.cache.processor.MutableEntry; @@ -2393,6 +2397,38 @@ public void testCacheIdleVerifyMovingParts() throws Exception { assertContains(log, testOut.toString(), "MOVING partitions"); } + /** + * @throws Exception If failed. + */ + @Test + public void testCacheIdleVerifyExpiringEntries() throws Exception { + IgniteEx ignite = startGrids(3); + + ignite.cluster().state(ACTIVE); + + IgniteCache cache = ignite.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setAffinity(new RendezvousAffinityFunction(false, 32)) + .setBackups(1)); + + Random rnd = new Random(); + + // Put without expiry policy. + for (int i = 0; i < 5_000; i++) + cache.put(i, i); + + // Put with expiry policy. + for (int i = 5_000; i < 10_000; i++) { + ExpiryPolicy expPol = new CreatedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, rnd.nextInt(1_000))); + cache.withExpiryPolicy(expPol).put(i, i); + } + + injectTestSystemOut(); + + assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify")); + + assertContains(log, testOut.toString(), "no conflicts have been found"); + } + /** */ @Test public void testCacheSequence() throws Exception { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java index 8bc5166e8d396..e378dec9b3539 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java @@ -89,7 +89,8 @@ public SnapshotPartitionsQuickVerifyHandler(GridCacheSharedContext cctx) { if (other == null) return; - if (val.size() != other.size() || !Objects.equals(val.updateCounter(), other.updateCounter())) + if ((!val.hasExpiringEntries() && !other.hasExpiringEntries() && val.size() != other.size()) + || !Objects.equals(val.updateCounter(), other.updateCounter())) wrnGrps.add(part.groupId()); })); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java index 3547d5edf30b2..e24a51c6b084a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java @@ -45,12 +45,15 @@ import org.apache.ignite.internal.managers.encryption.EncryptionCacheKeyProvider; import org.apache.ignite.internal.managers.encryption.GroupKey; import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted; +import org.apache.ignite.internal.pagemem.store.PageStore; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.StoredCacheData; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusMetaIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO; import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2; @@ -102,7 +105,7 @@ public SnapshotPartitionsVerifyHandler(GridCacheSharedContext cctx) { /** {@inheritDoc} */ @Override public Map invoke(SnapshotHandlerContext opCtx) throws IgniteCheckedException { if (!opCtx.snapshotDirectory().exists()) - throw new IgniteCheckedException("Snapshot directory doesn't exists: " + opCtx.snapshotDirectory());; + throw new IgniteCheckedException("Snapshot directory doesn't exists: " + opCtx.snapshotDirectory()); SnapshotMetadata meta = opCtx.metadata(); @@ -271,6 +274,10 @@ public SnapshotPartitionsVerifyHandler(GridCacheSharedContext cctx) { assert hash != null : "OWNING must have hash: " + key; + // We should skip size comparison if there are entries to expire exist. + if (hasExpiringEntries(snpCtx, pageStore, pageBuff, io.getPendingTreeRoot(pageAddr))) + hash.hasExpiringEntries(true); + res.put(key, hash); } catch (IOException e) { @@ -294,6 +301,40 @@ public SnapshotPartitionsVerifyHandler(GridCacheSharedContext cctx) { return res; } + /** */ + private boolean hasExpiringEntries( + GridKernalContext ctx, + PageStore pageStore, + ByteBuffer pageBuff, + long pendingTreeMetaId + ) throws IgniteCheckedException { + if (pendingTreeMetaId == 0) + return false; + + long pageAddr = GridUnsafe.bufferAddress(pageBuff); + + pageBuff.clear(); + pageStore.read(pendingTreeMetaId, pageBuff, true); + + if (PageIO.getCompressionType(pageBuff) != CompressionProcessor.UNCOMPRESSED_PAGE) + ctx.compress().decompressPage(pageBuff, pageStore.getPageSize()); + + BPlusMetaIO treeIO = BPlusMetaIO.VERSIONS.forPage(pageAddr); + + int rootLvl = treeIO.getRootLevel(pageAddr); + long rootId = treeIO.getFirstPageId(pageAddr, rootLvl); + + pageBuff.clear(); + pageStore.read(rootId, pageBuff, true); + + if (PageIO.getCompressionType(pageBuff) != CompressionProcessor.UNCOMPRESSED_PAGE) + ctx.compress().decompressPage(pageBuff, pageStore.getPageSize()); + + BPlusIO rootIO = PageIO.getPageIO(pageBuff); + + return rootIO.getCount(pageAddr) != 0; + } + /** {@inheritDoc} */ @Override public void complete(String name, Collection>> results) throws IgniteCheckedException { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java index b93e1cf139461..79c1806d08509 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java @@ -304,6 +304,9 @@ public static List compareUpdateCounters( while (it.hasNextX()) { CacheDataRow row = it.nextX(); + if (row.expireTime() > 0) + continue; + partHash += row.key().hashCode(); partVerHash += row.version().hashCode(); // Detects ABA problem. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java index df7c98452d15e..852fe856d3e42 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java @@ -95,6 +95,10 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject { @GridToStringExclude private int regKeys; + /** If partition has entries to expire. */ + @GridToStringExclude + private boolean hasExpiringEntries; + /** * @param partKey Partition key. * @param isPrimary Is primary. @@ -219,6 +223,16 @@ public int regularKeys() { return regKeys; } + /** */ + public boolean hasExpiringEntries() { + return hasExpiringEntries; + } + + /** */ + public void hasExpiringEntries(boolean hasExpiringEntries) { + this.hasExpiringEntries = hasExpiringEntries; + } + /** {@inheritDoc} */ @Override protected void writeExternalData(ObjectOutput out) throws IOException { out.writeObject(partKey); @@ -233,6 +247,7 @@ public int regularKeys() { out.writeInt(noCfKeys); out.writeInt(binKeys); out.writeInt(regKeys); + out.writeBoolean(hasExpiringEntries); } /** {@inheritDoc} */ @@ -255,6 +270,7 @@ public int regularKeys() { noCfKeys = in.readInt(); binKeys = in.readInt(); regKeys = in.readInt(); + hasExpiringEntries = in.readBoolean(); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java index f9286a2510d43..c3e811d8e7173 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java @@ -36,7 +36,11 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; +import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; @@ -600,6 +604,32 @@ public void testClusterSnapshotCheckMultipleTimes() throws Exception { assertTrue("Threads created: " + createdThreads, createdThreads < iterations); } + /** */ + @Test + public void testClusterSnapshotCheckWithExpiring() throws Exception { + IgniteEx ignite = startGrids(3); + + ignite.cluster().state(ACTIVE); + + IgniteCache cache = ignite.getOrCreateCache(new CacheConfiguration<>("expCache") + .setAffinity(new RendezvousAffinityFunction(false, 32)).setBackups(1)); + + Random rnd = new Random(); + + for (int i = 0; i < 10_000; i++) { + cache.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, + rnd.nextInt(10_000)))).put(i, i); + } + + long timeout = getTestTimeout(); + + snp(ignite).createSnapshot(SNAPSHOT_NAME).get(timeout); + + SnapshotPartitionsVerifyTaskResult res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get(timeout); + + assertFalse(res.idleVerifyResult().hasConflicts()); + } + /** * @param cls Class of running task. * @param results Results of compute.