Skip to content

Commit

Permalink
IGNITE-20722 Support skipCopies for DumpReader (#11014)
Browse files Browse the repository at this point in the history
  • Loading branch information
yurinaryshkin authored Oct 30, 2023
1 parent a7bc3d2 commit 63f1a80
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 8 deletions.
12 changes: 12 additions & 0 deletions modules/core/src/main/java/org/apache/ignite/dump/DumpReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -108,11 +109,22 @@ public DumpReader(DumpReaderConfiguration cfg, IgniteLogger log) {

AtomicBoolean skip = new AtomicBoolean(false);

Map<Integer, Set<Integer>> groups = cfg.skipCopies() ? new HashMap<>() : null;

if (groups != null)
grpToNodes.keySet().forEach(grpId -> groups.put(grpId, new HashSet<>()));

for (Map.Entry<Integer, List<String>> e : grpToNodes.entrySet()) {
int grp = e.getKey();

for (String node : e.getValue()) {
for (int part : dump.partitions(node, grp)) {
if (groups != null && !groups.get(grp).add(part)) {
log.info("Skip copy partition [node=" + node + ", grp=" + grp + ", part=" + part + ']');

continue;
}

Runnable consumePart = () -> {
if (skip.get()) {
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,15 @@ public class DumpReaderConfiguration {
/** Cache group names. */
private String[] cacheGroupNames;

/** Skip copies. */
private final boolean skipCopies;

/**
* @param dir Root dump directory.
* @param cnsmr Dump consumer.
*/
public DumpReaderConfiguration(File dir, DumpConsumer cnsmr) {
this(dir, cnsmr, DFLT_THREAD_CNT, DFLT_TIMEOUT, true, true, null);
this(dir, cnsmr, DFLT_THREAD_CNT, DFLT_TIMEOUT, true, true, null, false);
}

/**
Expand All @@ -75,8 +78,35 @@ public DumpReaderConfiguration(File dir, DumpConsumer cnsmr) {
* @param keepBinary If {@code true} then don't deserialize {@link KeyCacheObject} and {@link CacheObject}.
* @param cacheGroupNames Cache group names.
*/
public DumpReaderConfiguration(File dir, DumpConsumer cnsmr, int thCnt, Duration timeout, boolean failFast, boolean keepBinary,
public DumpReaderConfiguration(File dir,
DumpConsumer cnsmr,
int thCnt,
Duration timeout,
boolean failFast,
boolean keepBinary,
String[] cacheGroupNames
) {
this(dir, cnsmr, thCnt, timeout, failFast, keepBinary, cacheGroupNames, false);
}

/**
* @param dir Root dump directory.
* @param cnsmr Dump consumer.
* @param thCnt Count of threads to consume dumped partitions.
* @param timeout Timeout of dump reader invocation.
* @param failFast Stop processing partitions if consumer fail to process one.
* @param keepBinary If {@code true} then don't deserialize {@link KeyCacheObject} and {@link CacheObject}.
* @param cacheGroupNames Cache group names.
* @param skipCopies Skip copies.
*/
public DumpReaderConfiguration(File dir,
DumpConsumer cnsmr,
int thCnt,
Duration timeout,
boolean failFast,
boolean keepBinary,
String[] cacheGroupNames,
boolean skipCopies
) {
this.dir = dir;
this.cnsmr = cnsmr;
Expand All @@ -85,6 +115,7 @@ public DumpReaderConfiguration(File dir, DumpConsumer cnsmr, int thCnt, Duration
this.failFast = failFast;
this.keepBinary = keepBinary;
this.cacheGroupNames = cacheGroupNames;
this.skipCopies = skipCopies;
}

/** @return Root dump directiory. */
Expand Down Expand Up @@ -121,4 +152,9 @@ public boolean keepBinary() {
public String[] cacheGroupNames() {
return cacheGroupNames;
}

/** @return Skip copies. */
public boolean skipCopies() {
return skipCopies;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ void checkDump(IgniteEx ign, String name) throws Exception {
new HashSet<>(Arrays.asList(DEFAULT_CACHE_NAME, CACHE_0, CACHE_1)),
KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups),
2 * (KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups)),
KEYS_CNT);
KEYS_CNT,
false);
}

/** */
Expand All @@ -302,7 +303,8 @@ void checkDump(
Set<String> expectedFoundCaches,
int expectedDfltDumpSz,
int expectedGrpDumpSz,
int expectedCount
int expectedCount,
boolean skipCopies
) throws Exception {
checkDumpWithCommand(ign, name, backups);

Expand Down Expand Up @@ -408,7 +410,8 @@ else if (!cacheName.equals(DEFAULT_CACHE_NAME))
DFLT_THREAD_CNT, DFLT_TIMEOUT,
true,
false,
cacheGroupNames
cacheGroupNames,
skipCopies
),
log
).run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ public void testCacheDumpWithReadGroupFilter() throws Exception {
new HashSet<>(Arrays.asList(CACHE_0, CACHE_1)),
0,
2 * (KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups)),
0);
0,
false
);

checkDump(
ign,
Expand All @@ -172,7 +174,8 @@ public void testCacheDumpWithReadGroupFilter() throws Exception {
new HashSet<>(Arrays.asList(DEFAULT_CACHE_NAME)),
KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups),
0,
KEYS_CNT
KEYS_CNT,
false
);

checkDump(
Expand All @@ -182,7 +185,45 @@ public void testCacheDumpWithReadGroupFilter() throws Exception {
new HashSet<>(Arrays.asList(DEFAULT_CACHE_NAME, CACHE_0, CACHE_1)),
KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups),
2 * (KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups)),
KEYS_CNT
KEYS_CNT,
false
);
}
finally {
snpPoolSz = 1;
}
}

/** */
@Test
public void testSkipCopies() throws Exception {
snpPoolSz = 4;

try {
IgniteEx ign = startGridAndFillCaches();

createDump(ign);

checkDump(
ign,
DMP_NAME,
null,
new HashSet<>(Arrays.asList(DEFAULT_CACHE_NAME, CACHE_0, CACHE_1)),
KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups),
2 * (KEYS_CNT + (onlyPrimary ? 0 : KEYS_CNT * backups)),
KEYS_CNT,
false
);

checkDump(
ign,
DMP_NAME,
null,
new HashSet<>(Arrays.asList(DEFAULT_CACHE_NAME, CACHE_0, CACHE_1)),
KEYS_CNT,
2 * KEYS_CNT,
KEYS_CNT,
true
);
}
finally {
Expand Down

0 comments on commit 63f1a80

Please sign in to comment.