diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 9b3f1346c963b..f4d8fa6d8a5a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -55,6 +54,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; + import static org.apache.ignite.IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD; import static org.apache.ignite.IgniteSystemProperties.getFloat; @@ -76,23 +76,23 @@ public class GridAffinityAssignmentCache { * Affinity cache will shrink when total number of non-shallow (see {@link HistoryAffinityAssignmentImpl}) * historical instances will be greater than value of this constant. */ - private final int MAX_NON_SHALLOW_HIST_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, DFLT_AFFINITY_HISTORY_SIZE); + final int maxNonShallowHistSize = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, DFLT_AFFINITY_HISTORY_SIZE); /** * Affinity cache will also shrink when total number of both shallow ({@link HistoryAffinityAssignmentShallowCopy}) * and non-shallow (see {@link HistoryAffinityAssignmentImpl}) historical instances will be greater than * value of this constant. */ - private final int MAX_TOTAL_HIST_SIZE = MAX_NON_SHALLOW_HIST_SIZE * 10; + final int maxTotalHistSize = maxNonShallowHistSize * 10; /** - * Independent of {@link #MAX_NON_SHALLOW_HIST_SIZE} and {@link #MAX_TOTAL_HIST_SIZE}, affinity cache will always + * Independent of {@link #maxNonShallowHistSize} and {@link #maxTotalHistSize}, affinity cache will always * keep this number of non-shallow (see {@link HistoryAffinityAssignmentImpl}) instances. * We need at least one real instance, otherwise we won't be able to get affinity cache for * {@link GridCachePartitionExchangeManager#lastAffinityChangedTopologyVersion} in case cluster has experienced * too many client joins / client leaves / local cache starts. */ - private final int MIN_NON_SHALLOW_HIST_SIZE = 2; + private static final int MIN_NON_SHALLOW_HIST_SIZE = 2; /** Partition distribution. */ private final float partDistribution = @@ -144,7 +144,7 @@ public class GridAffinityAssignmentCache { private volatile IgniteCheckedException stopErr; /** Number of non-shallow (see {@link HistoryAffinityAssignmentImpl}) affinity cache instances. */ - private final AtomicInteger nonShallowHistSize = new AtomicInteger(); + private volatile int nonShallowHistSize; /** */ private final Object similarAffKey; @@ -311,7 +311,7 @@ public void onReconnected() { affCache.clear(); - nonShallowHistSize.set(0); + nonShallowHistSize = 0; head.set(new GridAffinityAssignmentV2(AffinityTopologyVersion.NONE)); @@ -575,7 +575,7 @@ public void clientEventTopologyChange(DiscoveryEvent evt, AffinityTopologyVersio Map.Entry prevHistEntry = affCache.floorEntry(prevVer); - HistoryAffinityAssignment newHistEntry = (prevHistEntry == null || shouldCleanupShallows()) ? + HistoryAffinityAssignment newHistEntry = (prevHistEntry == null) ? new HistoryAffinityAssignmentImpl(assignmentCpy, backups) : new HistoryAffinityAssignmentShallowCopy(prevHistEntry.getValue().origin(), topVer); @@ -759,7 +759,7 @@ public AffinityAssignment readyAffinity(AffinityTopologyVersion topVer) { ", topVer=" + topVer + ", head=" + head.get().topologyVersion() + ", history=" + affCache.keySet() + - ", maxNonShallowHistorySize=" + MAX_NON_SHALLOW_HIST_SIZE + + ", maxNonShallowHistorySize=" + maxNonShallowHistSize + ']'); } } @@ -826,7 +826,7 @@ public AffinityAssignment cachedAffinity( ", lastAffChangeTopVer=" + lastAffChangeTopVer + ", head=" + head.get().topologyVersion() + ", history=" + affCache.keySet() + - ", maxNonShallowHistorySize=" + MAX_NON_SHALLOW_HIST_SIZE + + ", maxNonShallowHistorySize=" + maxNonShallowHistSize + ']'); } @@ -839,7 +839,7 @@ public AffinityAssignment cachedAffinity( ", lastAffChangeTopVer=" + lastAffChangeTopVer + ", head=" + head.get().topologyVersion() + ", history=" + affCache.keySet() + - ", maxNonShallowHistorySize=" + MAX_NON_SHALLOW_HIST_SIZE + + ", maxNonShallowHistorySize=" + maxNonShallowHistSize + ']'); } } @@ -942,67 +942,52 @@ private void awaitTopologyVersion(AffinityTopologyVersion topVer) { * @param replaced Replaced entry in case history item was already present, null otherwise. * @param added New history item. */ - private void onHistoryAdded( + private synchronized void onHistoryAdded( HistoryAffinityAssignment replaced, HistoryAffinityAssignment added ) { - boolean cleanupNeeded = false; - if (replaced == null) { - cleanupNeeded = true; - - if (added.requiresHistoryCleanup()) - nonShallowHistSize.incrementAndGet(); + if (added.isFullSizeInstance()) + nonShallowHistSize++; } - else { - if (replaced.requiresHistoryCleanup() != added.requiresHistoryCleanup()) { - if (added.requiresHistoryCleanup()) { - cleanupNeeded = true; + else if (replaced.isFullSizeInstance() != added.isFullSizeInstance()) + nonShallowHistSize += added.isFullSizeInstance() ? 1 : -1; - nonShallowHistSize.incrementAndGet(); - } - else - nonShallowHistSize.decrementAndGet(); - } - } + int totalSize = affCache.size(); - if (!cleanupNeeded) + if (!shouldContinueCleanup(nonShallowHistSize, totalSize)) return; - int nonShallowSize = nonShallowHistSize.get(); + AffinityTopologyVersion lastAffChangeTopVer = + ctx.cache().context().exchange().lastAffinityChangedTopologyVersion(head.get().topologyVersion()); - int totalSize = affCache.size(); + HistoryAffinityAssignment aff0 = null; - if (shouldContinueCleanup(nonShallowSize, totalSize)) { - int initNonShallowSize = nonShallowSize; + Iterator it = affCache.values().iterator(); - Iterator it = affCache.values().iterator(); + while (it.hasNext() && shouldContinueCleanup(nonShallowHistSize, totalSize)) { + aff0 = it.next(); - while (it.hasNext()) { - HistoryAffinityAssignment aff0 = it.next(); + if (aff0.topologyVersion().equals(lastAffChangeTopVer)) + continue; // Keep lastAffinityChangedTopologyVersion, it's required for some operations. - if (aff0.requiresHistoryCleanup()) { - // We can stop cleanup only on non-shallow item. - // Keeping part of shallow items chain if corresponding real item is missing makes no sense. - if (!shouldContinueCleanup(nonShallowSize, totalSize)) { - nonShallowHistSize.getAndAdd(nonShallowSize - initNonShallowSize); + if (aff0.isFullSizeInstance()) { + if (nonShallowHistSize <= MIN_NON_SHALLOW_HIST_SIZE) + continue; - // GridAffinityProcessor#affMap has the same size and instance set as #affCache. - ctx.affinity().removeCachedAffinity(aff0.topologyVersion()); + nonShallowHistSize--; + } - return; - } + totalSize--; - nonShallowSize--; - } + it.remove(); + } - totalSize--; + assert aff0 != null; - it.remove(); - } + ctx.affinity().removeCachedAffinity(aff0.topologyVersion()); - assert false : "All elements have been removed from affinity cache during cleanup"; - } + assert it.hasNext() : "All elements have been removed from affinity cache during cleanup"; } /** @@ -1013,15 +998,7 @@ private void onHistoryAdded( * @return true if affinity cache cleanup is not finished yet. */ private boolean shouldContinueCleanup(int nonShallowSize, int totalSize) { - if (nonShallowSize <= MIN_NON_SHALLOW_HIST_SIZE) - return false; - - return nonShallowSize > MAX_NON_SHALLOW_HIST_SIZE || totalSize > MAX_TOTAL_HIST_SIZE; - } - - /** */ - private boolean shouldCleanupShallows() { - return nonShallowHistSize.get() <= MIN_NON_SHALLOW_HIST_SIZE && affCache.size() > MAX_TOTAL_HIST_SIZE; + return nonShallowSize > maxNonShallowHistSize || totalSize > maxTotalHistSize; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java index 19bd88c17e6a0..60dc9eb5d23ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java @@ -25,7 +25,7 @@ public interface HistoryAffinityAssignment extends AffinityAssignment { * * @return true if adding this instance to history should trigger size check and possible cleanup. */ - public boolean requiresHistoryCleanup(); + public boolean isFullSizeInstance(); /** * In case this instance is lightweight wrapper of another instance, this method should return reference diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentImpl.java index 6d30f54425568..d30c522970681 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentImpl.java @@ -331,7 +331,7 @@ private List partitionNodes(int p, boolean ideal, int cpys) { } /** {@inheritDoc} */ - @Override public boolean requiresHistoryCleanup() { + @Override public boolean isFullSizeInstance() { return true; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentShallowCopy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentShallowCopy.java index ac9bfa76858e7..c6876215926d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentShallowCopy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignmentShallowCopy.java @@ -46,7 +46,7 @@ public HistoryAffinityAssignmentShallowCopy( } /** {@inheritDoc} */ - @Override public boolean requiresHistoryCleanup() { + @Override public boolean isFullSizeInstance() { return false; } diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java index 44de7d3e8c681..05f258df7fca7 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityHistoryCleanupTest.java @@ -17,9 +17,9 @@ package org.apache.ignite.cache.affinity; +import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.configuration.CacheConfiguration; @@ -175,12 +175,67 @@ public void testAffinityHistoryCleanup() throws Exception { startGrid(4); checkHistory(ignite, F.asList( + topVer(5, 0), // Client event -> FullHistSize = 5. + topVer(6, 0), // Client event -> FullHistSize = 5. + topVer(7, 0), // Client event -> FullHistSize = 5. + topVer(8, 0), // Client event -> FullHistSize = 5. + topVer(9, 0), // Client event -> FullHistSize = 5. + topVer(10, 0), // Client event -> FullHistSize = 5. topVer(11, 0), // FullHistSize = 5. topVer(11, 1), // FullHistSize = 5. topVer(12, 0), // FullHistSize = 6 - 1 = 5. topVer(13, 0), // FullHistSize = 5. topVer(13, 1)), // FullHistSize = 6 - 1 = 5. 5); + + stopGrid(4); + + checkHistory(ignite, F.asList( + topVer(11, 1), // FullHistSize = 5. + topVer(12, 0), // FullHistSize = 6 - 1 = 5. + topVer(13, 0), // FullHistSize = 5. + topVer(13, 1), // FullHistSize = 6 - 1 = 5. + topVer(14, 0)), // FullHistSize = 6 - 1 = 5. + 5); + } + + /** + * @throws Exception If failed. + */ + @Test + @WithSystemProperty(key = IGNITE_AFFINITY_HISTORY_SIZE, value = "2") + public void testAffinityShallowHistoryCleanup() throws Exception { + Ignite ignite = startGrid(0); + + checkHistory(ignite, F.asList(topVer(1, 0)), 1); + + for (int i = 0; i < 15; i++) { + startClientGrid(1); + stopGrid(1); + } + + List expVers = new ArrayList<>(); + expVers.add(topVer(1, 0)); + + for (int i = 13; i <= 31; i++) + expVers.add(topVer(i, 0)); + + checkHistory(ignite, expVers, 1); + + startGrid(1); + + expVers.clear(); + + for (int i = 14; i <= 32; i++) + expVers.add(topVer(i, 0)); + + expVers.add(topVer(32, 1)); + + checkHistory(ignite, expVers, 2); + + stopGrid(1); + + checkHistory(ignite, F.asList(topVer(32, 1), topVer(33, 0)), 2); } /** @@ -199,9 +254,9 @@ private void checkHistory(Ignite ignite, List expHist, for (GridCacheContext cctx : proc.context().cacheContexts()) { GridAffinityAssignmentCache aff = GridTestUtils.getFieldValue(cctx.affinity(), "aff"); - AtomicInteger fullHistSize = GridTestUtils.getFieldValue(aff, "nonShallowHistSize"); + int fullHistSize = GridTestUtils.getFieldValue(aff, "nonShallowHistSize"); - assertEquals(expSize, fullHistSize.get()); + assertEquals(expSize, fullHistSize); Map cache = GridTestUtils.getFieldValue(aff, "affCache"); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridHistoryAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridHistoryAffinityAssignmentTest.java index 80760c3a41a74..348ad585e1c5e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridHistoryAffinityAssignmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridHistoryAffinityAssignmentTest.java @@ -20,11 +20,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Set; import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -157,4 +159,40 @@ public void testAffinityCacheSizeOnReconnect() throws Exception { } } } + + /** */ + @Test + public void testAffinityCacheMaxSize() throws Exception { + try (IgniteEx server = startGrids(2)) { + server.getOrCreateCache(DEFAULT_CACHE_NAME); + + CacheGroupContext gctx = server.context().cache().cache(DEFAULT_CACHE_NAME).context().group(); + Set affCache = gctx.affinity().cachedVersions(); + int maxTotalHistSize = gctx.affinity().maxTotalHistSize; + int maxNonShallowHistSize = gctx.affinity().maxNonShallowHistSize; + + for (int i = 0; i < (maxTotalHistSize - 10) / 2; i++) { + server.createCache(DEFAULT_CACHE_NAME + i); + server.destroyCache(DEFAULT_CACHE_NAME + i); + } + + startClientGridsMultiThreaded(2, 10); + + assertEquals(maxTotalHistSize, affCache.size()); + + for (int i = 0; i < maxTotalHistSize / 2; i++) { + server.createCache(DEFAULT_CACHE_NAME + i); + server.destroyCache(DEFAULT_CACHE_NAME + i); + } + + assertEquals(maxTotalHistSize, affCache.size()); + + for (int i = 0; i < maxNonShallowHistSize / 2 + 1; i++) { + stopGrid(1); + startGrid(1); + } + + assertEquals(maxNonShallowHistSize, affCache.size()); + } + } } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlAffinityCacheTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlAffinityCacheTest.java new file mode 100644 index 0000000000000..01567e11447c9 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlAffinityCacheTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.query; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest; +import org.junit.Test; + +/** + * + */ +public class SqlAffinityCacheTest extends AbstractIndexingCommonTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setIndexedTypes(Integer.class, String.class)); + } + + /** + * Test that affinity assignment instance (required for queries) is not removed from affinity cache + * after numerous client node left/join events. + */ + @Test + public void testAffinityCache() throws Exception { + startGrid(0); + + for (int i = 0; i < 300; i++) { + startClientGrid(1); + stopGrid(1); + } + + IgniteCache cache = grid(0).cache(DEFAULT_CACHE_NAME); + cache.put(0, "0"); + + assertEquals(1, cache.query(new SqlFieldsQuery("SELECT * FROM String")).getAll().size()); + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java index 3d2732584b35f..e0013831966cf 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.query.IgniteSqlCreateTableTemplateTest; import org.apache.ignite.internal.processors.query.LocalQueryLazyTest; import org.apache.ignite.internal.processors.query.LongRunningQueryTest; +import org.apache.ignite.internal.processors.query.SqlAffinityCacheTest; import org.apache.ignite.internal.processors.query.SqlLocalQueryConnectionAndStatementTest; import org.apache.ignite.internal.processors.query.SqlPartOfComplexPkLookupTest; import org.apache.ignite.internal.processors.query.SqlQueriesTopologyMappingTest; @@ -93,8 +94,8 @@ SqlQueriesTopologyMappingTest.class, - IgniteCacheQueryReservationOnUnstableTopologyTest.class - + IgniteCacheQueryReservationOnUnstableTopologyTest.class, + SqlAffinityCacheTest.class, }) public class IgniteBinaryCacheQueryTestSuite4 { /** Setup lazy mode default. */