diff --git a/CHANGES.md b/CHANGES.md
index 4c96673d4..9d1e61ef1 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -2,6 +2,8 @@
## Version 1.0.0 (Not yet Release)
+* Implement ReplicationStateImpl to Manage and Cache Token Range to Replica Mappings - Issue #719
+* Implement NodeResolverImpl to Resolve Nodes by IP Address and UUID - Issue #718
* Specify Interval for Next Connection - Issue #674
* Retry Policy for Jmx Connection - Issue #700
* Update Architecture and Tests Documentations to Add the Agent Features and The cassandra-test-image - Issue #707
diff --git a/application/pom.xml b/application/pom.xml
index 175ba585d..1c1bad561 100644
--- a/application/pom.xml
+++ b/application/pom.xml
@@ -50,6 +50,18 @@
${project.version}
+
+ com.ericsson.bss.cassandra.ecchronos
+ core
+ ${project.version}
+
+
+
+ com.ericsson.bss.cassandra.ecchronos
+ core.impl
+ ${project.version}
+
+
org.springframework.boot
diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/BeanConfigurator.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/BeanConfigurator.java
index 30f582e0a..2d7ebfb7c 100644
--- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/BeanConfigurator.java
+++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/BeanConfigurator.java
@@ -14,11 +14,16 @@
*/
package com.ericsson.bss.cassandra.ecchronos.application.spring;
+import com.datastax.oss.driver.api.core.CqlSession;
import com.ericsson.bss.cassandra.ecchronos.application.config.Interval;
import com.ericsson.bss.cassandra.ecchronos.application.config.security.CqlTLSConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.security.ReloadingCertificateHandler;
import com.ericsson.bss.cassandra.ecchronos.application.providers.AgentJmxConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.metadata.NodeResolverImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.state.ReplicationStateImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.metadata.NodeResolver;
+import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState;
import com.ericsson.bss.cassandra.ecchronos.data.exceptions.EcChronosException;
import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync;
@@ -220,6 +225,22 @@ public RetrySchedulerService retrySchedulerService(final Config config,
return new RetrySchedulerService(eccNodesSync, config, jmxConnectionProvider, nativeConnectionProvider);
}
+ @Bean
+ public NodeResolver nodeResolver(final DistributedNativeConnectionProvider distributedNativeConnectionProvider)
+ {
+ CqlSession session = distributedNativeConnectionProvider.getCqlSession();
+ return new NodeResolverImpl(session);
+ }
+
+ @Bean
+ public ReplicationState replicationState(
+ final DistributedNativeConnectionProvider distributedNativeConnectionProvider,
+ final NodeResolver nodeResolver)
+ {
+ CqlSession session = distributedNativeConnectionProvider.getCqlSession();
+ return new ReplicationStateImpl(nodeResolver, session);
+ }
+
private Security getSecurityConfig() throws ConfigurationException
{
return ConfigurationHelper.DEFAULT_INSTANCE.getConfiguration(SECURITY_FILE, Security.class);
diff --git a/core.impl/pom.xml b/core.impl/pom.xml
new file mode 100644
index 000000000..675e2eb89
--- /dev/null
+++ b/core.impl/pom.xml
@@ -0,0 +1,102 @@
+
+
+
+ 4.0.0
+
+ com.ericsson.bss.cassandra.ecchronos
+ agent
+ 1.0.0-SNAPSHOT
+
+
+ core.impl
+
+
+
+
+ com.ericsson.bss.cassandra.ecchronos
+ core
+ ${project.version}
+
+
+
+ com.ericsson.bss.cassandra.ecchronos
+ connection
+ ${project.version}
+
+
+
+ com.datastax.oss
+ java-driver-core
+
+
+
+
+ com.datastax.oss
+ java-driver-query-builder
+
+
+
+ com.google.guava
+ guava
+
+
+
+ com.github.ben-manes.caffeine
+ caffeine
+
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+ org.junit.vintage
+ junit-vintage-engine
+ test
+
+
+
+ commons-io
+ commons-io
+ test
+
+
+
+ org.awaitility
+ awaitility
+ test
+
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+
\ No newline at end of file
diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/metadata/NodeResolverImpl.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/metadata/NodeResolverImpl.java
new file mode 100644
index 000000000..09b320bfc
--- /dev/null
+++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/metadata/NodeResolverImpl.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core.impl.metadata;
+
+import com.ericsson.bss.cassandra.ecchronos.core.metadata.DriverNode;
+import com.ericsson.bss.cassandra.ecchronos.core.metadata.NodeResolver;
+import java.net.InetAddress;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.metadata.Metadata;
+import com.datastax.oss.driver.api.core.metadata.Node;
+
+public class NodeResolverImpl implements NodeResolver
+{
+ private final ConcurrentMap addressToNodeMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap idToNodeMap = new ConcurrentHashMap<>();
+
+ private final CqlSession session;
+
+ public NodeResolverImpl(final CqlSession aSession)
+ {
+ this.session = aSession;
+ }
+
+ @Override
+ public final Optional fromIp(final InetAddress inetAddress)
+ {
+ DriverNode node = addressToNodeMap.get(inetAddress);
+
+ if (node == null)
+ {
+ node = addressToNodeMap.computeIfAbsent(inetAddress, address -> lookup(inetAddress));
+ }
+ else if (!inetAddress.equals(node.getPublicAddress()))
+ {
+ // IP mapping is wrong, we should remove the old entry and retry
+ addressToNodeMap.remove(inetAddress, node);
+ return fromIp(inetAddress);
+ }
+
+ return Optional.ofNullable(node);
+ }
+
+ @Override
+ public final Optional fromUUID(final UUID nodeId)
+ {
+ return Optional.ofNullable(resolve(nodeId));
+ }
+
+ private DriverNode resolve(final UUID nodeId)
+ {
+ DriverNode node = idToNodeMap.get(nodeId);
+ if (node == null)
+ {
+ node = idToNodeMap.computeIfAbsent(nodeId, this::lookup);
+ }
+
+ return node;
+ }
+
+ private DriverNode lookup(final UUID nodeId)
+ {
+ Metadata metadata = session.getMetadata();
+ for (Node node : metadata.getNodes().values())
+ {
+ if (node.getHostId().equals(nodeId))
+ {
+ return new DriverNode(node);
+ }
+ }
+ return null;
+ }
+
+ private DriverNode lookup(final InetAddress inetAddress)
+ {
+ Metadata metadata = session.getMetadata();
+ for (Node node : metadata.getNodes().values())
+ {
+ if (node.getBroadcastAddress().get().getAddress().equals(inetAddress))
+ {
+ return resolve(node.getHostId());
+ }
+ }
+ return null;
+ }
+}
diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/metadata/package-info.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/metadata/package-info.java
new file mode 100644
index 000000000..9d80d660d
--- /dev/null
+++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/metadata/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Contains the implementations and resources for mapping node metadata.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core.impl.metadata;
diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/ReplicationStateImpl.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/ReplicationStateImpl.java
new file mode 100644
index 000000000..adc108ade
--- /dev/null
+++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/ReplicationStateImpl.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core.impl.state;
+
+import com.ericsson.bss.cassandra.ecchronos.core.metadata.DriverNode;
+import com.ericsson.bss.cassandra.ecchronos.core.metadata.NodeResolver;
+import com.ericsson.bss.cassandra.ecchronos.core.state.LongTokenRange;
+import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.metadata.Metadata;
+import com.datastax.oss.driver.api.core.metadata.Node;
+import com.datastax.oss.driver.api.core.metadata.TokenMap;
+import com.datastax.oss.driver.api.core.metadata.token.TokenRange;
+import com.datastax.oss.driver.internal.core.metadata.token.Murmur3Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.ericsson.bss.cassandra.ecchronos.core.metadata.Metadata.quoteIfNeeded;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Utility class to generate a token -> replicas map for a specific table.
+ */
+public class ReplicationStateImpl implements ReplicationState
+{
+ private static final Logger LOG = LoggerFactory.getLogger(ReplicationStateImpl.class);
+
+ private static final Map>>
+ KEYSPACE_REPLICATION_CACHE = new ConcurrentHashMap<>();
+ private static final Map>>
+ CLUSTER_WIDE_KEYSPACE_REPLICATION_CACHE = new ConcurrentHashMap<>();
+
+ private final NodeResolver myNodeResolver;
+ private final CqlSession mySession;
+
+ public ReplicationStateImpl(final NodeResolver nodeResolver, final CqlSession session)
+ {
+ myNodeResolver = nodeResolver;
+ mySession = session;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ImmutableSet getNodes(
+ final TableReference tableReference,
+ final LongTokenRange tokenRange,
+ final Node currentNode)
+ {
+ String keyspace = tableReference.getKeyspace();
+
+ ImmutableMap> replication = maybeRenew(keyspace, currentNode);
+ return getNodes(replication, tokenRange);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ImmutableSet getReplicas(
+ final TableReference tableReference,
+ final Node currentNode)
+ {
+ Map> tokens = getTokenRangeToReplicas(tableReference, currentNode);
+ Set allReplicas = new HashSet<>();
+ for (ImmutableSet replicas : tokens.values())
+ {
+ allReplicas.addAll(replicas);
+ }
+ return ImmutableSet.copyOf(allReplicas);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ImmutableSet getNodesClusterWide(
+ final TableReference tableReference,
+ final LongTokenRange tokenRange,
+ final Node currentNode)
+ {
+ String keyspace = tableReference.getKeyspace();
+
+ ImmutableMap> replication = maybeRenewClusterWide(keyspace, currentNode);
+ return getNodes(replication, tokenRange);
+ }
+
+ private ImmutableSet getNodes(final ImmutableMap> replication,
+ final LongTokenRange tokenRange)
+ {
+ ImmutableSet nodes = replication.get(tokenRange);
+
+ if (nodes == null)
+ {
+ for (Map.Entry> entry : replication.entrySet())
+ {
+ if (entry.getKey().isCovering(tokenRange))
+ {
+ nodes = entry.getValue();
+ break;
+ }
+ }
+ }
+
+ return nodes;
+ }
+
+ /**
+ * Get token ranges to replicas.
+ *
+ * @param tableReference
+ * The table used to calculate the proper replication.
+ * @return Nodes and their ranges
+ */
+ @Override
+ public Map> getTokenRangeToReplicas(
+ final TableReference tableReference,
+ final Node currentNode)
+ {
+ String keyspace = tableReference.getKeyspace();
+ return maybeRenew(keyspace, currentNode);
+ }
+
+ private ImmutableMap> maybeRenew(
+ final String keyspace,
+ final Node currentNode)
+ {
+ ImmutableMap> replication = buildTokenMap(
+ keyspace,
+ false,
+ currentNode);
+
+ return KEYSPACE_REPLICATION_CACHE.compute(keyspace, (k, v) -> !replication.equals(v) ? replication : v);
+ }
+
+ /**
+ * Get token ranges.
+ *
+ * @param tableReference Table reference.
+ * @return Nodes and their ranges
+ */
+ @Override
+ public Map> getTokenRanges(
+ final TableReference tableReference,
+ final Node currentNode)
+ {
+ String keyspace = tableReference.getKeyspace();
+ return maybeRenewClusterWide(keyspace, currentNode);
+ }
+
+ private ImmutableMap> maybeRenewClusterWide(
+ final String keyspace,
+ final Node currentNode)
+ {
+ ImmutableMap> replication = buildTokenMap(
+ keyspace,
+ true,
+ currentNode);
+
+ return CLUSTER_WIDE_KEYSPACE_REPLICATION_CACHE
+ .compute(keyspace, (k, v) -> !replication.equals(v) ? replication : v);
+ }
+
+ private ImmutableMap> buildTokenMap(
+ final String keyspace,
+ final boolean clusterWide,
+ final Node currentNode)
+ {
+ ImmutableMap.Builder> replicationBuilder = ImmutableMap.builder();
+ Map, ImmutableSet> replicaCache = new HashMap<>();
+ Metadata metadata = mySession.getMetadata();
+ Optional tokenMap = metadata.getTokenMap();
+ if (!tokenMap.isPresent())
+ {
+ throw new IllegalStateException("Cannot determine ranges, is metadata/tokenMap disabled?");
+ }
+ String keyspaceName = quoteIfNeeded(keyspace);
+ Set tokenRanges;
+ if (clusterWide)
+ {
+ tokenRanges = tokenMap.get().getTokenRanges();
+ }
+ else
+ {
+ tokenRanges = tokenMap.get().getTokenRanges(keyspaceName, currentNode);
+ }
+ for (TokenRange tokenRange : tokenRanges)
+ {
+ LongTokenRange longTokenRange = convert(tokenRange);
+ ImmutableSet replicas
+ = replicaCache.computeIfAbsent(tokenMap.get().getReplicas(keyspaceName, tokenRange), this::convert);
+
+ replicationBuilder.put(longTokenRange, replicas);
+ }
+
+ return replicationBuilder.build();
+ }
+
+ private ImmutableSet convert(final Set nodes)
+ {
+ ImmutableSet.Builder builder = new ImmutableSet.Builder<>();
+ for (Node node : nodes)
+ {
+ Optional broadcastAddress = node.getBroadcastAddress();
+ if (broadcastAddress.isPresent())
+ {
+ Optional resolvedNode = myNodeResolver.fromIp(broadcastAddress.get().getAddress());
+ if (resolvedNode.isPresent())
+ {
+ builder.add(resolvedNode.get());
+ }
+ else
+ {
+ LOG.warn("Node {} - {} not found in node resolver", node.getHostId(), broadcastAddress.get());
+ }
+ }
+ else
+ {
+ LOG.warn("Could not determine broadcast address for node {}", node.getHostId());
+ }
+ }
+ return builder.build();
+ }
+
+ private LongTokenRange convert(final TokenRange range)
+ {
+ // Assuming murmur3 partitioner
+ long start = ((Murmur3Token) range.getStart()).getValue();
+ long end = ((Murmur3Token) range.getEnd()).getValue();
+ return new LongTokenRange(start, end);
+ }
+}
diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/package-info.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/package-info.java
new file mode 100644
index 000000000..860498539
--- /dev/null
+++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Contains the implementation and resources for stateful declarations.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core.impl.state;
diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/metadata/TestNodeResolverImpl.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/metadata/TestNodeResolverImpl.java
new file mode 100644
index 000000000..469170020
--- /dev/null
+++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/metadata/TestNodeResolverImpl.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core.impl.metadata;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+import com.ericsson.bss.cassandra.ecchronos.core.metadata.DriverNode;
+import com.ericsson.bss.cassandra.ecchronos.core.metadata.NodeResolver;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.metadata.Metadata;
+import com.datastax.oss.driver.api.core.metadata.Node;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestNodeResolverImpl
+{
+ @Mock
+ private Metadata mockMetadata;
+
+ @Mock
+ private CqlSession mockCqlSession;
+
+ private Map mockedNodes = new HashMap<>();
+
+ private NodeResolver nodeResolver;
+
+ @Before
+ public void setup() throws Exception
+ {
+ when(mockMetadata.getNodes()).thenReturn(mockedNodes);
+
+ // Add two dummy hosts so that we know we find the correct host
+ addNode(new InetSocketAddress(address("127.0.0.2"), 9042), "dc1");
+ addNode(new InetSocketAddress(address("127.0.0.3"), 9042), "dc1");
+
+ when(mockCqlSession.getMetadata()).thenReturn(mockMetadata);
+
+ nodeResolver = new NodeResolverImpl(mockCqlSession);
+ }
+
+ @Test
+ public void testGetHost() throws Exception
+ {
+ Node node = addNode(new InetSocketAddress(address("127.0.0.1"), 9042), "dc1");
+
+ Optional maybeNode = nodeResolver.fromIp(address("127.0.0.1"));
+ assertThat(maybeNode).isPresent();
+ assertThat(maybeNode.get().getId()).isEqualTo(node.getHostId());
+ assertThat(maybeNode.get().getPublicAddress()).isEqualTo(address("127.0.0.1"));
+ assertThat(maybeNode.get().getDatacenter()).isEqualTo("dc1");
+
+ assertThat(nodeResolver.fromIp(address("127.0.0.1"))).containsSame(maybeNode.get());
+ assertThat(nodeResolver.fromUUID(node.getHostId())).containsSame(maybeNode.get());
+ }
+
+ @Test
+ public void testChangeIpAddress() throws Exception
+ {
+ Node node = addNode(new InetSocketAddress(address("127.0.0.1"), 9042), "dc1");
+
+ Optional maybeNode = nodeResolver.fromIp(address("127.0.0.1"));
+ assertThat(maybeNode).isPresent();
+
+ assertThat(maybeNode.get().getPublicAddress()).isEqualTo(address("127.0.0.1"));
+
+ when(node.getBroadcastAddress()).thenReturn(Optional.of(new InetSocketAddress(address("127.0.0.5"), 9042)));
+
+ assertThat(maybeNode.get().getId()).isEqualTo(node.getHostId());
+ assertThat(maybeNode.get().getPublicAddress()).isEqualTo(address("127.0.0.5"));
+ assertThat(maybeNode.get().getDatacenter()).isEqualTo("dc1");
+
+ // New mapping for the node
+ assertThat(nodeResolver.fromIp(address("127.0.0.5"))).containsSame(maybeNode.get());
+ assertThat(nodeResolver.fromUUID(node.getHostId())).containsSame(maybeNode.get());
+
+ // Make sure the old mapping is removed
+ assertThat(nodeResolver.fromIp(address("127.0.0.1"))).isEmpty();
+ }
+
+ @Test
+ public void testChangeIpAddressAndAddNewReplica() throws Exception
+ {
+ Node node = addNode(new InetSocketAddress(address("127.0.0.1"), 9042), "dc1");
+
+ Optional maybeNode = nodeResolver.fromIp(address("127.0.0.1"));
+ assertThat(maybeNode).isPresent();
+
+ assertThat(maybeNode.get().getPublicAddress()).isEqualTo(address("127.0.0.1"));
+
+ when(node.getBroadcastAddress()).thenReturn(Optional.of(new InetSocketAddress(address("127.0.0.5"), 9042)));
+
+ assertThat(maybeNode.get().getId()).isEqualTo(node.getHostId());
+ assertThat(maybeNode.get().getPublicAddress()).isEqualTo(address("127.0.0.5"));
+ assertThat(maybeNode.get().getDatacenter()).isEqualTo("dc1");
+
+ // New mapping for the node
+ assertThat(nodeResolver.fromIp(address("127.0.0.5"))).containsSame(maybeNode.get());
+ assertThat(nodeResolver.fromUUID(node.getHostId())).containsSame(maybeNode.get());
+
+ // If a new node is using the old ip we should return it
+ Node newNode = addNode(new InetSocketAddress(address("127.0.0.1"), 9042), "dc2");
+
+ Optional maybeNewNode = nodeResolver.fromIp(address("127.0.0.1"));
+ assertThat(maybeNewNode).isPresent();
+
+ assertThat(maybeNewNode.get().getId()).isEqualTo(newNode.getHostId());
+ assertThat(maybeNewNode.get().getPublicAddress()).isEqualTo(address("127.0.0.1"));
+ assertThat(maybeNewNode.get().getDatacenter()).isEqualTo("dc2");
+ assertThat(nodeResolver.fromUUID(newNode.getHostId())).containsSame(maybeNewNode.get());
+
+ assertThat(maybeNewNode.get()).isNotSameAs(maybeNode.get());
+ }
+
+ @Test
+ public void testGetNonExistingHost() throws Exception
+ {
+ Optional maybeNode = nodeResolver.fromIp(address("127.0.0.1"));
+ assertThat(maybeNode).isEmpty();
+
+ maybeNode = nodeResolver.fromUUID(UUID.randomUUID());
+ assertThat(maybeNode).isEmpty();
+ }
+
+ private InetAddress address(String address) throws UnknownHostException
+ {
+ return InetAddress.getByName(address);
+ }
+
+ private Node addNode(InetSocketAddress broadcastAddress, String dataCenter)
+ {
+ Node node = mock(Node.class);
+
+ UUID id = UUID.randomUUID();
+ when(node.getHostId()).thenReturn(id);
+ when(node.getBroadcastAddress()).thenReturn(Optional.of(broadcastAddress));
+ when(node.getDatacenter()).thenReturn(dataCenter);
+
+ mockedNodes.put(id, node);
+ when(mockMetadata.getNodes()).thenReturn(mockedNodes);
+ return node;
+ }
+}
diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/TestReplicationStateImpl.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/TestReplicationStateImpl.java
new file mode 100644
index 000000000..4fbe4cd1c
--- /dev/null
+++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/TestReplicationStateImpl.java
@@ -0,0 +1,407 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core.impl.state;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.metadata.Metadata;
+import com.datastax.oss.driver.api.core.metadata.Node;
+import com.datastax.oss.driver.api.core.metadata.TokenMap;
+import com.datastax.oss.driver.api.core.metadata.token.TokenRange;
+import com.ericsson.bss.cassandra.ecchronos.core.metadata.DriverNode;
+import com.ericsson.bss.cassandra.ecchronos.core.metadata.NodeResolver;
+import com.ericsson.bss.cassandra.ecchronos.core.state.LongTokenRange;
+import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Optional;
+
+
+import static com.ericsson.bss.cassandra.ecchronos.core.impl.table.MockTableReferenceFactory.tableReference;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestReplicationStateImpl
+{
+ @Mock
+ private NodeResolver mockNodeResolver;
+
+ @Mock
+ private CqlSession mockSession;
+
+ @Mock
+ private Metadata mockMetadata;
+
+ @Mock
+ private TokenMap mockTokenMap;
+
+ @Mock
+ private Node mockReplica1;
+
+ @Mock
+ private Node mockReplica2;
+
+ @Mock
+ private Node mockReplica3;
+
+ @Mock
+ private Node mockReplica4;
+
+ @Mock
+ private DriverNode mockNode1;
+
+ @Mock
+ private DriverNode mockNode2;
+
+ @Mock
+ private DriverNode mockNode3;
+
+ @Mock
+ private DriverNode mockNode4;
+
+ @Before
+ public void setup() throws Exception
+ {
+ InetAddress address1 = InetAddress.getByName("127.0.0.1");
+ InetSocketAddress address11 = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9042);
+ InetAddress address2 = InetAddress.getByName("127.0.0.2");
+ InetSocketAddress address22 = new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 9042);
+ InetAddress address3 = InetAddress.getByName("127.0.0.3");
+ InetSocketAddress address33 = new InetSocketAddress(InetAddress.getByName("127.0.0.3"), 9042);
+ InetAddress address4 = InetAddress.getByName("127.0.0.4");
+ InetSocketAddress address44 = new InetSocketAddress(InetAddress.getByName("127.0.0.4"), 9042);
+
+ when(mockReplica1.getBroadcastAddress()).thenReturn(Optional.of(address11));
+ when(mockReplica2.getBroadcastAddress()).thenReturn(Optional.of(address22));
+ when(mockReplica3.getBroadcastAddress()).thenReturn(Optional.of(address33));
+ when(mockReplica4.getBroadcastAddress()).thenReturn(Optional.of(address44));
+
+ when(mockNodeResolver.fromIp(eq(address1))).thenReturn(Optional.of(mockNode1));
+ when(mockNodeResolver.fromIp(eq(address2))).thenReturn(Optional.of(mockNode2));
+ when(mockNodeResolver.fromIp(eq(address3))).thenReturn(Optional.of(mockNode3));
+ when(mockNodeResolver.fromIp(eq(address4))).thenReturn(Optional.of(mockNode4));
+
+ when(mockMetadata.getTokenMap()).thenReturn(Optional.of(mockTokenMap));
+ when(mockSession.getMetadata()).thenReturn(mockMetadata);
+ }
+
+ @Test
+ public void testGetTokenRangeToReplicaSingleToken() throws Exception
+ {
+ LongTokenRange range1 = new LongTokenRange(1, 2);
+ TableReference tableReference = tableReference("ks", "tb");
+
+ TokenRange tokenRange = TokenUtil.getRange(1, 2);
+
+ doReturn(Sets.newHashSet(tokenRange)).when(mockTokenMap).getTokenRanges(eq("ks"), eq(mockReplica1));
+ doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap)
+ .getReplicas(eq("ks"), eq(tokenRange));
+
+ ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession);
+
+ Map> tokenRangeToReplicas = replicationState.getTokenRangeToReplicas(
+ tableReference, mockReplica1);
+
+ assertThat(tokenRangeToReplicas.keySet()).containsExactlyInAnyOrder(range1);
+ assertThat(tokenRangeToReplicas.get(range1)).containsExactlyInAnyOrder(mockNode1, mockNode2, mockNode3);
+
+ assertThat(replicationState.getNodes(tableReference, range1, mockReplica1)).isSameAs(tokenRangeToReplicas.get(range1));
+ }
+
+ @Test
+ public void testGetTokenRangeToReplica() throws Exception
+ {
+ LongTokenRange range1 = new LongTokenRange(1, 2);
+ LongTokenRange range2 = new LongTokenRange(2, 3);
+ TableReference tableReference = tableReference("ks", "tb");
+
+ TokenRange tokenRange1 = TokenUtil.getRange(1, 2);
+ TokenRange tokenRange2 = TokenUtil.getRange(2, 3);
+
+ doReturn(Sets.newHashSet(tokenRange1, tokenRange2)).when(mockTokenMap)
+ .getTokenRanges(eq("ks"), eq(mockReplica1));
+ doReturn(Sets.newHashSet(mockReplica1, mockReplica2)).when(mockTokenMap).getReplicas(eq("ks"), eq(tokenRange1));
+ doReturn(Sets.newHashSet(mockReplica1, mockReplica3)).when(mockTokenMap).getReplicas(eq("ks"), eq(tokenRange2));
+
+ ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession);
+
+ Map> tokenRangeToReplicas = replicationState.getTokenRangeToReplicas(
+ tableReference, mockReplica1);
+
+ assertThat(tokenRangeToReplicas.keySet()).containsExactlyInAnyOrder(range1, range2);
+ assertThat(tokenRangeToReplicas.get(range1)).containsExactlyInAnyOrder(mockNode1, mockNode2);
+ assertThat(tokenRangeToReplicas.get(range2)).containsExactlyInAnyOrder(mockNode1, mockNode3);
+
+ assertThat(replicationState.getNodes(tableReference, range1, mockReplica1)).isSameAs(tokenRangeToReplicas.get(range1));
+ assertThat(replicationState.getNodes(tableReference, range2, mockReplica1)).isSameAs(tokenRangeToReplicas.get(range2));
+ }
+
+ @Test
+ public void testGetReplicas() throws Exception
+ {
+ TableReference tableReference = tableReference("ks", "tb");
+
+ TokenRange tokenRange1 = TokenUtil.getRange(1, 2);
+ TokenRange tokenRange2 = TokenUtil.getRange(2, 3);
+
+ doReturn(Sets.newHashSet(tokenRange1, tokenRange2)).when(mockTokenMap)
+ .getTokenRanges(eq("ks"), eq(mockReplica1));
+ doReturn(Sets.newHashSet(mockReplica1, mockReplica2)).when(mockTokenMap).getReplicas(eq("ks"), eq(tokenRange1));
+ doReturn(Sets.newHashSet(mockReplica1, mockReplica3)).when(mockTokenMap).getReplicas(eq("ks"), eq(tokenRange2));
+
+ ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession);
+
+ ImmutableSet replicas = replicationState.getReplicas(tableReference, mockReplica1);
+
+ assertThat(replicas).containsExactlyInAnyOrder(mockNode1, mockNode2, mockNode3);
+ }
+
+ @Test
+ public void testGetTokenRangeToReplicaSetReuse() throws Exception
+ {
+ LongTokenRange range1 = new LongTokenRange(1, 2);
+ LongTokenRange range2 = new LongTokenRange(2, 3);
+ TableReference tableReference = tableReference("ks", "tb");
+
+ TokenRange tokenRange1 = TokenUtil.getRange(1, 2);
+ TokenRange tokenRange2 = TokenUtil.getRange(2, 3);
+
+ doReturn(Sets.newHashSet(tokenRange1, tokenRange2)).when(mockTokenMap)
+ .getTokenRanges(eq("ks"), eq(mockReplica1));
+ doReturn(Sets.newHashSet(mockReplica1, mockReplica2)).when(mockTokenMap).getReplicas(eq("ks"), eq(tokenRange1));
+ doReturn(Sets.newHashSet(mockReplica1, mockReplica2)).when(mockTokenMap).getReplicas(eq("ks"), eq(tokenRange2));
+
+ ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession);
+
+ Map> tokenRangeToReplicas = replicationState.getTokenRangeToReplicas(
+ tableReference, mockReplica1);
+
+ assertThat(tokenRangeToReplicas.keySet()).containsExactlyInAnyOrder(range1, range2);
+ assertThat(tokenRangeToReplicas.get(range1)).containsExactlyInAnyOrder(mockNode1, mockNode2);
+ assertThat(tokenRangeToReplicas.get(range1)).isSameAs(tokenRangeToReplicas.get(range2));
+
+ assertThat(replicationState.getNodes(tableReference, range1, mockReplica1)).isSameAs(tokenRangeToReplicas.get(range1));
+ assertThat(replicationState.getNodes(tableReference, range2, mockReplica1)).isSameAs(tokenRangeToReplicas.get(range2));
+ }
+
+ @Test
+ public void testGetTokenRangeToReplicaMapReuse() throws Exception
+ {
+ LongTokenRange range1 = new LongTokenRange(1, 2);
+ TableReference tableReference = tableReference("ks", "tb");
+
+ TokenRange tokenRange = TokenUtil.getRange(1, 2);
+
+ doReturn(Sets.newHashSet(tokenRange)).when(mockTokenMap).getTokenRanges(eq("ks"), eq(mockReplica1));
+ doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap)
+ .getReplicas(eq("ks"), eq(tokenRange));
+
+ ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession);
+
+ Map> tokenRangeToReplicas = replicationState.getTokenRangeToReplicas(
+ tableReference, mockReplica1);
+
+ assertThat(tokenRangeToReplicas.keySet()).containsExactlyInAnyOrder(range1);
+ assertThat(tokenRangeToReplicas.get(range1)).containsExactlyInAnyOrder(mockNode1, mockNode2, mockNode3);
+
+ assertThat(replicationState.getTokenRangeToReplicas(tableReference, mockReplica1)).isSameAs(tokenRangeToReplicas);
+
+ assertThat(replicationState.getNodes(tableReference, range1, mockReplica1)).isSameAs(tokenRangeToReplicas.get(range1));
+ }
+
+ @Test
+ public void testGetNodesForSubRange() throws Exception
+ {
+ LongTokenRange subRange = new LongTokenRange(2, 3);
+ TableReference tableReference = tableReference("ks", "tb");
+
+ TokenRange tokenRange = TokenUtil.getRange(1, 5);
+
+ doReturn(Sets.newHashSet(tokenRange)).when(mockTokenMap).getTokenRanges(eq("ks"), eq(mockReplica1));
+ doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap)
+ .getReplicas(eq("ks"), eq(tokenRange));
+
+ ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession);
+
+ ImmutableSet nodes = replicationState.getNodes(tableReference, subRange, mockReplica1);
+
+ assertThat(nodes).containsExactlyInAnyOrder(mockNode1, mockNode2, mockNode3);
+ }
+
+ @Test
+ public void testGetNodesForNonExistingSubRange() throws Exception
+ {
+ LongTokenRange subRange = new LongTokenRange(6, 7);
+ TableReference tableReference = tableReference("ks", "tb");
+
+ TokenRange tokenRange = TokenUtil.getRange(1, 5);
+
+ doReturn(Sets.newHashSet(tokenRange)).when(mockTokenMap).getTokenRanges(eq("ks"), eq(mockReplica1));
+ doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap)
+ .getReplicas(eq("ks"), eq(tokenRange));
+
+ ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession);
+
+ assertThat(replicationState.getNodes(tableReference, subRange, mockReplica1)).isNull();
+ }
+
+ @Test
+ public void testGetNodesForIntersectingSubRange() throws Exception
+ {
+ LongTokenRange subRange = new LongTokenRange(4, 7);
+ TableReference tableReference = tableReference("ks", "tb");
+
+ TokenRange existingRange = TokenUtil.getRange(1, 5);
+ TokenRange existingRange2 = TokenUtil.getRange(5, 9);
+
+ doReturn(Sets.newHashSet(existingRange, existingRange2)).when(mockTokenMap)
+ .getTokenRanges(eq("ks"), eq(mockReplica1));
+ doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap)
+ .getReplicas(eq("ks"), eq(existingRange));
+ doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap)
+ .getReplicas(eq("ks"), eq(existingRange2));
+
+ ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession);
+
+ assertThat(replicationState.getNodes(tableReference, subRange, mockReplica1)).isNull();
+ }
+
+ @Test
+ public void testGetNodesClusterWideForSubRange() throws Exception
+ {
+ LongTokenRange subRange = new LongTokenRange(2, 3);
+ TableReference tableReference = tableReference("ks", "tb");
+
+ TokenRange tokenRange = TokenUtil.getRange(1, 5);
+
+ doReturn(Sets.newHashSet(tokenRange)).when(mockTokenMap).getTokenRanges();
+ doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap)
+ .getReplicas(eq("ks"), eq(tokenRange));
+
+ ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession);
+
+ ImmutableSet nodes = replicationState.getNodesClusterWide(tableReference, subRange, mockReplica1);
+
+ assertThat(nodes).containsExactlyInAnyOrder(mockNode1, mockNode2, mockNode3);
+ }
+
+ @Test
+ public void testGetNodesClusterWideForNonExistingSubRange() throws Exception
+ {
+ LongTokenRange subRange = new LongTokenRange(6, 7);
+ TableReference tableReference = tableReference("ks", "tb");
+
+ TokenRange tokenRange = TokenUtil.getRange(1, 5);
+
+ doReturn(Sets.newHashSet(tokenRange)).when(mockTokenMap).getTokenRanges();
+ doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap)
+ .getReplicas(eq("ks"), eq(tokenRange));
+
+ ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession);
+
+ assertThat(replicationState.getNodesClusterWide(tableReference, subRange, mockReplica1)).isNull();
+ }
+
+ @Test
+ public void testGetNodesClusterWideForIntersectingSubRange() throws Exception
+ {
+ LongTokenRange subRange = new LongTokenRange(4, 7);
+ TableReference tableReference = tableReference("ks", "tb");
+
+ TokenRange existingRange = TokenUtil.getRange(1, 5);
+ TokenRange existingRange2 = TokenUtil.getRange(5, 9);
+
+ doReturn(Sets.newHashSet(existingRange, existingRange2)).when(mockTokenMap)
+ .getTokenRanges();
+ doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap)
+ .getReplicas(eq("ks"), eq(existingRange));
+ doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap)
+ .getReplicas(eq("ks"), eq(existingRange2));
+
+ ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession);
+
+ assertThat(replicationState.getNodesClusterWide(tableReference, subRange, mockReplica1)).isNull();
+ }
+
+ @Test
+ public void testGetTokenRanges() throws Exception
+ {
+ LongTokenRange range1 = new LongTokenRange(1, 2);
+ LongTokenRange range2 = new LongTokenRange(2, 3);
+ LongTokenRange range3 = new LongTokenRange(3, 4);
+ LongTokenRange range4 = new LongTokenRange(4, 5);
+ TableReference tableReference = tableReference("ks", "tb");
+
+ TokenRange tokenRange1 = TokenUtil.getRange(1, 2);
+ TokenRange tokenRange2 = TokenUtil.getRange(2, 3);
+ TokenRange tokenRange3 = TokenUtil.getRange(3, 4);
+ TokenRange tokenRange4 = TokenUtil.getRange(4, 5);
+
+ doReturn(Sets.newHashSet(tokenRange1, tokenRange2, tokenRange3, tokenRange4)).when(mockTokenMap)
+ .getTokenRanges();
+ doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap)
+ .getReplicas(eq("ks"), eq(tokenRange1));
+ doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap)
+ .getReplicas(eq("ks"), eq(tokenRange2));
+ doReturn(Sets.newHashSet(mockReplica2, mockReplica3, mockReplica4)).when(mockTokenMap)
+ .getReplicas(eq("ks"), eq(tokenRange3));
+ doReturn(Sets.newHashSet(mockReplica2, mockReplica3, mockReplica4)).when(mockTokenMap)
+ .getReplicas(eq("ks"), eq(tokenRange4));
+
+ ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession);
+
+ Map> tokenRanges = replicationState.getTokenRanges(tableReference, mockReplica1);
+
+ assertThat(tokenRanges.keySet()).containsExactlyInAnyOrder(range1, range2, range3, range4);
+ assertThat(tokenRanges.get(range1)).containsExactlyInAnyOrder(mockNode1, mockNode2, mockNode3);
+ assertThat(tokenRanges.get(range2)).containsExactlyInAnyOrder(mockNode1, mockNode2, mockNode3);
+ assertThat(tokenRanges.get(range3)).containsExactlyInAnyOrder(mockNode2, mockNode3, mockNode4);
+ assertThat(tokenRanges.get(range4)).containsExactlyInAnyOrder(mockNode2, mockNode3, mockNode4);
+ }
+
+ @Test
+ public void testGetTokenRangesReuse() throws Exception
+ {
+ LongTokenRange range1 = new LongTokenRange(1, 2);
+ TableReference tableReference = tableReference("ks", "tb");
+
+ TokenRange tokenRange = TokenUtil.getRange(1, 2);
+
+ doReturn(Sets.newHashSet(tokenRange)).when(mockTokenMap).getTokenRanges();
+ doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap)
+ .getReplicas(eq("ks"), eq(tokenRange));
+
+ ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession);
+
+ Map> tokenRanges = replicationState.getTokenRanges(tableReference, mockReplica1);
+
+ assertThat(tokenRanges.keySet()).containsExactlyInAnyOrder(range1);
+ assertThat(tokenRanges.get(range1)).containsExactlyInAnyOrder(mockNode1, mockNode2, mockNode3);
+
+ assertThat(replicationState.getTokenRanges(tableReference, mockReplica1)).isSameAs(tokenRanges);
+ }
+}
diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/TokenUtil.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/TokenUtil.java
new file mode 100644
index 000000000..80ec56dd4
--- /dev/null
+++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/TokenUtil.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core.impl.state;
+
+import com.datastax.oss.driver.api.core.metadata.token.TokenRange;
+import com.datastax.oss.driver.internal.core.metadata.token.Murmur3Token;
+import com.datastax.oss.driver.internal.core.metadata.token.Murmur3TokenFactory;
+import com.datastax.oss.driver.internal.core.metadata.token.TokenFactory;
+
+public class TokenUtil
+{
+ public static TokenRange getRange(long start, long end) throws Exception
+ {
+ TokenFactory tokenFactory = new Murmur3TokenFactory();
+ return tokenFactory.range(new Murmur3Token(start), new Murmur3Token(end));
+ }
+}
diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/MockTableReferenceFactory.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/MockTableReferenceFactory.java
new file mode 100644
index 000000000..087b17852
--- /dev/null
+++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/MockTableReferenceFactory.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core.impl.table;
+
+import com.ericsson.bss.cassandra.ecchronos.core.exceptions.EcChronosException;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableReferenceFactory;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.datastax.oss.driver.api.core.CqlIdentifier;
+import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
+
+public class MockTableReferenceFactory implements TableReferenceFactory
+{
+ public static final int DEFAULT_GC_GRACE_SECONDS = 7200;
+ private static final ConcurrentMap tableReferences = new ConcurrentHashMap<>();
+
+ @Override
+ public TableReference forTable(String keyspace, String table)
+ {
+ return tableReference(keyspace, table);
+ }
+
+ @Override
+ public TableReference forTable(TableMetadata table)
+ {
+ return tableReference(table);
+ }
+
+ @Override
+ public Set forKeyspace(String keyspace) throws EcChronosException
+ {
+ Set tableReferences = new HashSet<>();
+ tableReferences.add(tableReference(keyspace, "table1"));
+ tableReferences.add(tableReference(keyspace, "table2"));
+ tableReferences.add(tableReference(keyspace, "table3"));
+ return tableReferences;
+ }
+
+ @Override
+ public Set forCluster()
+ {
+ Set tableReferences = new HashSet<>();
+ tableReferences.add(tableReference("keyspace1", "table1"));
+ tableReferences.add(tableReference("keyspace1", "table2"));
+ tableReferences.add(tableReference("keyspace1", "table3"));
+ tableReferences.add(tableReference("keyspace2", "table4"));
+ tableReferences.add(tableReference("keyspace3", "table5"));
+ return tableReferences;
+ }
+
+ public static TableReference tableReference(String keyspace, String table)
+ {
+ return tableReference(keyspace, table, DEFAULT_GC_GRACE_SECONDS);
+ }
+
+ public static TableReference tableReference(String keyspace, String table, int gcGraceSeconds)
+ {
+ TableKey tableKey = new TableKey(keyspace, table);
+ TableReference tableReference = tableReferences.get(tableKey);
+ if (tableReference == null)
+ {
+ tableReference = tableReferences.computeIfAbsent(tableKey,
+ tb -> new MockTableReference(UUID.randomUUID(), keyspace, table, gcGraceSeconds));
+ }
+
+ return tableReference;
+ }
+
+ public static TableReference tableReference(TableMetadata table)
+ {
+ return new MockTableReference(table);
+ }
+
+ static class MockTableReference implements TableReference
+ {
+ private final UUID id;
+ private final String keyspace;
+ private final String table;
+ private final int gcGraceSeconds;
+
+ MockTableReference(UUID id, String keyspace, String table)
+ {
+ this(id, keyspace, table, DEFAULT_GC_GRACE_SECONDS);
+ }
+
+ MockTableReference(TableMetadata tableMetadata)
+ {
+ this(tableMetadata.getId().get(), tableMetadata.getKeyspace().asInternal(),
+ tableMetadata.getName().asInternal(),
+ (int) tableMetadata.getOptions().get(CqlIdentifier.fromInternal("gc_grace_seconds")));
+ }
+
+ MockTableReference(UUID id, String keyspace, String table, int gcGraceSeconds)
+ {
+ this.id = id;
+ this.keyspace = keyspace;
+ this.table = table;
+ this.gcGraceSeconds = gcGraceSeconds;
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return id;
+ }
+
+ @Override
+ public String getTable()
+ {
+ return table;
+ }
+
+ @Override
+ public String getKeyspace()
+ {
+ return keyspace;
+ }
+
+ @Override
+ public int getGcGraceSeconds()
+ {
+ return gcGraceSeconds;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ MockTableReference that = (MockTableReference) o;
+ return id.equals(that.id) &&
+ keyspace.equals(that.keyspace) &&
+ table.equals(that.table);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(id, keyspace, table);
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("%s.%s (mock)", keyspace, table);
+ }
+ }
+
+ static class TableKey
+ {
+ private final String keyspace;
+ private final String table;
+
+ TableKey(String keyspace, String table)
+ {
+ this.keyspace = keyspace;
+ this.table = table;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ TableKey tableKey = (TableKey) o;
+ return keyspace.equals(tableKey.keyspace) &&
+ table.equals(tableKey.table);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(keyspace, table);
+ }
+ }
+}
+
diff --git a/core/pom.xml b/core/pom.xml
new file mode 100644
index 000000000..89900e10f
--- /dev/null
+++ b/core/pom.xml
@@ -0,0 +1,90 @@
+
+
+
+ 4.0.0
+
+ com.ericsson.bss.cassandra.ecchronos
+ agent
+ 1.0.0-SNAPSHOT
+
+
+ core
+
+
+
+
+ com.datastax.oss
+ java-driver-core
+
+
+
+ com.datastax.oss
+ java-driver-query-builder
+
+
+
+ com.google.guava
+ guava
+
+
+
+ com.github.ben-manes.caffeine
+ caffeine
+
+
+
+
+ org.junit.vintage
+ junit-vintage-engine
+ test
+
+
+
+ commons-io
+ commons-io
+ test
+
+
+
+ org.awaitility
+ awaitility
+ test
+
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+
+ nl.jqno.equalsverifier
+ equalsverifier
+ test
+
+
+
+
\ No newline at end of file
diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/exceptions/EcChronosException.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/exceptions/EcChronosException.java
new file mode 100644
index 000000000..1c468e839
--- /dev/null
+++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/exceptions/EcChronosException.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core.exceptions;
+
+/**
+ * Generic exception thrown by schedulers to signal that something went wrong.
+ */
+public class EcChronosException extends Exception
+{
+ private static final long serialVersionUID = 1148561336907867613L;
+
+ public EcChronosException(final String message)
+ {
+ super(message);
+ }
+
+ public EcChronosException(final Throwable t)
+ {
+ super(t);
+ }
+
+ public EcChronosException(final String message, final Throwable t)
+ {
+ super(message, t);
+ }
+}
diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/exceptions/package-info.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/exceptions/package-info.java
new file mode 100644
index 000000000..423bced6e
--- /dev/null
+++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/exceptions/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Contains custom ecChronos exceptions.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core.exceptions;
diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/DriverNode.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/DriverNode.java
new file mode 100644
index 000000000..6399e7824
--- /dev/null
+++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/DriverNode.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core.metadata;
+
+import com.datastax.oss.driver.api.core.metadata.Node;
+
+import java.net.InetAddress;
+import java.util.Objects;
+import java.util.UUID;
+
+/**
+ * An internal representation of a node.
+ * This class together with {@link com.ericsson.bss.cassandra.ecchronos.core.metadata.NodeResolver} makes it easier to
+ * translate node IP to host ID and other way around.
+ */
+public class DriverNode
+{
+ private final Node node;
+
+ public DriverNode(final Node aNode)
+ {
+ this.node = aNode;
+ }
+
+ /**
+ * Get the host id of the node.
+ *
+ * @return The host id of the node.
+ */
+ public UUID getId()
+ {
+ return node.getHostId();
+ }
+
+ /**
+ * Get the public ip address of the node.
+ *
+ * @return The public ip address of the node.
+ */
+ public InetAddress getPublicAddress()
+ {
+ return node.getBroadcastAddress().get().getAddress();
+ }
+
+ /**
+ * Get the datacenter the node resides in.
+ *
+ * @return The datacenter of the node.
+ */
+ public String getDatacenter()
+ {
+ return node.getDatacenter();
+ }
+
+ /**
+ * Check for equality.
+ */
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+ DriverNode that = (DriverNode) o;
+ return node.equals(that.node);
+ }
+
+ @Override
+ public final int hashCode()
+ {
+ return Objects.hash(node);
+ }
+
+ @Override
+ public final String toString()
+ {
+ return String.format("Node(%s:%s:%s)", getId(), getDatacenter(), getPublicAddress());
+ }
+}
diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/Metadata.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/Metadata.java
new file mode 100644
index 000000000..6dcc06425
--- /dev/null
+++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/Metadata.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core.metadata;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
+import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
+import com.datastax.oss.driver.internal.core.util.Strings;
+
+import java.util.Optional;
+
+/**
+ * Helper class to retrieve keyspace and table metadata,
+ * this should be preferred than doing session.getMetadata().getKeyspace(name) or keyspaceMetadata.getTable(name)
+ * Main purpose is to not have to care if the keyspace/table string representation is quoted or not.
+ * In driver, keyspaces/tables with camelCase needs to be quoted.
+ */
+public final class Metadata
+{
+ private Metadata()
+ {
+ //Intentionally left empty
+ }
+
+ public static Optional getKeyspace(final CqlSession session, final String keyspace)
+ {
+ String keyspaceName = quoteIfNeeded(keyspace);
+ return session.getMetadata().getKeyspace(keyspaceName);
+ }
+
+ public static Optional getTable(final KeyspaceMetadata keyspaceMetadata, final String table)
+ {
+ String tableName = quoteIfNeeded(table);
+ return keyspaceMetadata.getTable(tableName);
+ }
+
+ public static String quoteIfNeeded(final String keyspaceOrTable)
+ {
+ return Strings.needsDoubleQuotes(keyspaceOrTable) && !Strings.isDoubleQuoted(keyspaceOrTable)
+ ? Strings.doubleQuote(keyspaceOrTable)
+ : keyspaceOrTable;
+ }
+}
+
diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/NodeResolver.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/NodeResolver.java
new file mode 100644
index 000000000..0d7d8b9c6
--- /dev/null
+++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/NodeResolver.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core.metadata;
+
+import java.net.InetAddress;
+import java.util.Optional;
+import java.util.UUID;
+
+/**
+ * Node resolver interface.
+ */
+public interface NodeResolver
+{
+ /**
+ * Retrieve a node based on public ip address.
+ *
+ * @param inetAddress The public ip address of the node instance.
+ * @return The node.
+ */
+ Optional fromIp(InetAddress inetAddress);
+
+ Optional fromUUID(UUID nodeId);
+}
diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/package-info.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/package-info.java
new file mode 100644
index 000000000..173afadf7
--- /dev/null
+++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Contains the Interfaces and resources for mapping node metadata.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core.metadata;
diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/state/LongTokenRange.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/state/LongTokenRange.java
new file mode 100644
index 000000000..26bdb2caa
--- /dev/null
+++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/state/LongTokenRange.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core.state;
+
+import java.math.BigInteger;
+
+/**
+ * A representation of a token range in Cassandra.
+ */
+@SuppressWarnings("VisibilityModifier")
+public class LongTokenRange
+{
+ private static final int HASH_THIRTYONE = 31;
+ private static final int HASH_THIRTYTWO = 32;
+
+ private static final int LONG_VALUE_BITS = 64;
+
+ public static final BigInteger RANGE_END =
+ BigInteger.valueOf(2).pow(LONG_VALUE_BITS - 1).subtract(BigInteger.ONE); // Long.MAX_VALUE
+ public static final BigInteger FULL_RANGE =
+ BigInteger.valueOf(2).pow(LONG_VALUE_BITS);
+
+ public final long start;
+ public final long end;
+
+ public LongTokenRange(final long aStart, final long anEnd)
+ {
+ this.start = aStart;
+ this.end = anEnd;
+ }
+
+ /**
+ * Check if the token range is wrapping around.
+ *
+ * @return True in case this token range wraps around.
+ */
+ public boolean isWrapAround()
+ {
+ return start >= end;
+ }
+
+ /**
+ * Calculate the size of the token range.
+ *
+ * @return The size of the token range.
+ */
+ public BigInteger rangeSize()
+ {
+ BigInteger tokenStart = BigInteger.valueOf(start);
+ BigInteger tokenEnd = BigInteger.valueOf(end);
+
+ BigInteger rangeSize = tokenEnd.subtract(tokenStart);
+
+ if (rangeSize.compareTo(BigInteger.ZERO) <= 0)
+ {
+ rangeSize = rangeSize.add(FULL_RANGE);
+ }
+
+ return rangeSize;
+ }
+
+ /**
+ * Check if this range covers the other range.
+ *
+ * The range (I, J] covers (K, L] if:
+ *
+ * I <= K and J >= L if either both are wrapping or not wrapping.
+ *
+ * I <= K or J >= L if this is wrapping.
+ *
+ * @param other The token range to check if this is covering.
+ * @return True if this token range covers the provided token range.
+ */
+ public boolean isCovering(final LongTokenRange other)
+ {
+ boolean thisWraps = isWrapAround();
+ boolean otherWraps = other.isWrapAround();
+
+ if (thisWraps == otherWraps)
+ {
+ // Normal case - are we including the other range
+ return start <= other.start && end >= other.end;
+ }
+ else if (thisWraps)
+ {
+ // If only this wraps we cover it if either:
+ // start is before the other start
+ // end is after the other end
+ return this.start <= other.start || this.end >= other.end;
+ }
+
+ // If the other wraps but we don't we can't possibly cover it
+ return false;
+ }
+
+ @Override
+ public final String toString()
+ {
+ return String.format("(%s,%s]", start, end);
+ }
+
+ @Override
+ public final boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ LongTokenRange that = (LongTokenRange) o;
+
+ if (start != that.start)
+ {
+ return false;
+ }
+ return end == that.end;
+ }
+
+ @Override
+ public final int hashCode()
+ {
+ int result = (int) (start ^ (start >>> HASH_THIRTYTWO));
+ result = HASH_THIRTYONE * result + (int) (end ^ (end >>> HASH_THIRTYTWO));
+ return result;
+ }
+}
diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/state/ReplicationState.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/state/ReplicationState.java
new file mode 100644
index 000000000..019596e73
--- /dev/null
+++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/state/ReplicationState.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core.state;
+
+import com.datastax.oss.driver.api.core.metadata.Node;
+import com.ericsson.bss.cassandra.ecchronos.core.metadata.DriverNode;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Replication state interface used to retrieve mappings between token range to responsible nodes.
+ *
+ * Within a keyspace the methods are expected to return the exact same object instance for a set of nodes.
+ */
+public interface ReplicationState
+{
+ /**
+ * Get the nodes that are responsible for the provided token range.
+ * The provided token range can be a sub range of an existing one.
+ *
+ * @param tableReference The table used to calculate the proper replication.
+ * @param tokenRange The token range to get nodes for.
+ * @return The responsible nodes or null if either the token range does not exist or is intersecting two ranges.
+ */
+ ImmutableSet getNodes(TableReference tableReference, LongTokenRange tokenRange, Node currentNode);
+
+ /**
+ * Get the nodes that are a replica for the provided table that have ranges in common with the local node.
+ *
+ * @param tableReference The table to fetch replicas for.
+ * @return The replicas for the table
+ */
+ ImmutableSet getReplicas(TableReference tableReference, Node currentNode);
+
+ /**
+ * Get the nodes that are responsible for the provided token range, check clusterwide.
+ * The provided token range can be a sub range of an existing one.
+ *
+ * @param tableReference The table used to calculate the proper replication.
+ * @param tokenRange The token range to get nodes for.
+ * @return The responsible nodes or null if either the token range does not exist or is intersecting two ranges.
+ */
+ ImmutableSet getNodesClusterWide(TableReference tableReference, LongTokenRange tokenRange, Node currentNode);
+
+ /**
+ * Get a map of the current replication state for the provided table.
+ *
+ * @param tableReference
+ * The table used to calculate the proper replication.
+ * @return The map consisting of token -> responsible nodes.
+ */
+ Map> getTokenRangeToReplicas(TableReference tableReference, Node currentNode);
+
+ Map> getTokenRanges(TableReference tableReference, Node currentNode);
+}
diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/state/package-info.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/state/package-info.java
new file mode 100644
index 000000000..34d4750dd
--- /dev/null
+++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/state/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Contains the interfaces and resources for stateful declarations.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core.state;
diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/table/TableReference.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/table/TableReference.java
new file mode 100644
index 000000000..bd90ab7f2
--- /dev/null
+++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/table/TableReference.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core.table;
+
+import java.util.UUID;
+
+/**
+ * An interface containing keyspace/table mapping to avoid passing around two strings to refer to one specific table.
+ */
+public interface TableReference
+{
+ UUID getId();
+
+ String getTable();
+
+ String getKeyspace();
+
+ int getGcGraceSeconds();
+}
diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/table/TableReferenceFactory.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/table/TableReferenceFactory.java
new file mode 100644
index 000000000..8c9f08750
--- /dev/null
+++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/table/TableReferenceFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core.table;
+
+import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
+import com.ericsson.bss.cassandra.ecchronos.core.exceptions.EcChronosException;
+import java.util.Set;
+
+/**
+ * A factory that generates table references.
+ */
+public interface TableReferenceFactory
+{
+ /**
+ * Get a table reference for the provided keyspace/table pair.
+ *
+ * @param keyspace The keyspace name.
+ * @param table The table name.
+ * @return A table reference for the provided keyspace/table pair or null if table does not exist.
+ */
+ TableReference forTable(String keyspace, String table);
+
+ /**
+ * Get a table reference for the provided TableMetadata.
+ *
+ * @param table the TableMetadata.
+ * @return A table reference for the provided keyspace/table pair.
+ */
+ TableReference forTable(TableMetadata table);
+
+ /**
+ * Get all table references in keyspace.
+ *
+ * @param keyspace The keyspace name
+ * @throws com.ericsson.bss.cassandra.ecchronos.core.exceptions.EcChronosException if keyspace does not exist.
+ * @return A unique set of all table references for a specific keyspace.
+ */
+ Set forKeyspace(String keyspace) throws EcChronosException;
+
+ /**
+ * Get all table references for a cluster (all keyspaces, all tables).
+ *
+ * @return A unique set of all table references for the cluster.
+ */
+ Set forCluster();
+}
diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/table/package-info.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/table/package-info.java
new file mode 100644
index 000000000..7d66e62ed
--- /dev/null
+++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/table/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Contains the interfaces and resources for tables and keyspace.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core.table;
diff --git a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/TestMetadata.java b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/TestMetadata.java
new file mode 100644
index 000000000..a3992ba92
--- /dev/null
+++ b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/TestMetadata.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core.metadata;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
+import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.Silent.class)
+public class TestMetadata
+{
+ @Mock
+ CqlSession cqlSessionMock;
+
+ @Mock
+ com.datastax.oss.driver.api.core.metadata.Metadata metadataMock;
+
+ @Mock
+ KeyspaceMetadata keyspaceMetadataMock;
+
+ @Mock
+ TableMetadata tableMetadataMock;
+
+ @Before
+ public void setup()
+ {
+ when(cqlSessionMock.getMetadata()).thenReturn(metadataMock);
+ }
+
+ @Test
+ public void testGetKeyspace()
+ {
+ String keyspace = "keyspace1";
+ when(metadataMock.getKeyspace(eq(keyspace))).thenReturn(Optional.of(keyspaceMetadataMock));
+ assertThat(Metadata.getKeyspace(cqlSessionMock, keyspace)).isNotEmpty();
+ }
+
+ @Test
+ public void testGetKeyspaceWithCamelCase()
+ {
+ String keyspace = "keyspace1WithCamelCase";
+ when(metadataMock.getKeyspace(eq("\""+keyspace+"\""))).thenReturn(Optional.of(keyspaceMetadataMock));
+ assertThat(Metadata.getKeyspace(cqlSessionMock, keyspace)).isNotEmpty();
+ }
+
+ @Test
+ public void testGetKeyspaceWithCamelCaseAlreadyQuoted()
+ {
+ String keyspace = "keyspace1WithCamelCase";
+ when(metadataMock.getKeyspace(eq("\""+keyspace+"\""))).thenReturn(Optional.of(keyspaceMetadataMock));
+ assertThat(Metadata.getKeyspace(cqlSessionMock, "\""+keyspace+"\"")).isNotEmpty();
+ }
+
+ @Test
+ public void testGetTable()
+ {
+ String table = "table1";
+ when(keyspaceMetadataMock.getTable(eq(table))).thenReturn(Optional.of(tableMetadataMock));
+ assertThat(Metadata.getTable(keyspaceMetadataMock, table)).isNotEmpty();
+ }
+
+ @Test
+ public void testGetTableWithCamelCase()
+ {
+ String table = "table1WithCamelCase";
+ when(keyspaceMetadataMock.getTable(eq("\""+table+"\""))).thenReturn(Optional.of(tableMetadataMock));
+ assertThat(Metadata.getTable(keyspaceMetadataMock, table)).isNotEmpty();
+ }
+
+ @Test
+ public void testGetTableWithCamelCaseAlreadyQuoted()
+ {
+ String table = "table1WithCamelCase";
+ when(keyspaceMetadataMock.getTable(eq("\""+table+"\""))).thenReturn(Optional.of(tableMetadataMock));
+ assertThat(Metadata.getTable(keyspaceMetadataMock, "\""+table+"\"")).isNotEmpty();
+ }
+}
diff --git a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/state/TestLongTokenRange.java b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/state/TestLongTokenRange.java
new file mode 100644
index 000000000..ff706ec5a
--- /dev/null
+++ b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/state/TestLongTokenRange.java
@@ -0,0 +1,194 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core.state;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.Test;
+
+import java.math.BigInteger;
+
+public class TestLongTokenRange
+{
+ private static final BigInteger FULL_RANGE = BigInteger.valueOf(2).pow(64);
+
+ @Test
+ public void testRangesEqual()
+ {
+ LongTokenRange range1 = new LongTokenRange(1, 2);
+ LongTokenRange range2 = new LongTokenRange(1, 2);
+
+ assertThat(range1).isEqualTo(range2);
+ assertThat(range1.hashCode()).isEqualTo(range2.hashCode());
+ }
+
+ @Test
+ public void testRangesNotEqual()
+ {
+ LongTokenRange range1 = new LongTokenRange(1, 2);
+ LongTokenRange range2 = new LongTokenRange(2, 3);
+
+ assertThat(range1).isNotEqualTo(range2);
+ }
+
+ @Test
+ public void testIsWrapAroundNonWrapping()
+ {
+ assertThat(new LongTokenRange(1, 2).isWrapAround()).isFalse();
+ }
+
+ @Test
+ public void testIsWrapAroundWrapping()
+ {
+ assertThat(new LongTokenRange(2, 1).isWrapAround()).isTrue();
+ }
+
+ @Test
+ public void testIsWrapAroundFullRange()
+ {
+ assertThat(new LongTokenRange(Long.MIN_VALUE, Long.MIN_VALUE).isWrapAround()).isTrue();
+ }
+
+ @Test
+ public void testRangeSizeFullRange()
+ {
+ LongTokenRange tokenRange = new LongTokenRange(Long.MIN_VALUE, Long.MIN_VALUE);
+
+ assertThat(tokenRange.rangeSize()).isEqualTo(FULL_RANGE);
+ }
+
+ @Test
+ public void testRangeSizePositive()
+ {
+ LongTokenRange tokenRange = new LongTokenRange(10, 123456789);
+ BigInteger expectedRangeSize = BigInteger.valueOf(123456779);
+
+ assertThat(tokenRange.rangeSize()).isEqualTo(expectedRangeSize);
+ }
+
+ @Test
+ public void testRangeSizeNegative()
+ {
+ LongTokenRange tokenRange = new LongTokenRange(-123456789, -10);
+ BigInteger expectedRangeSize = BigInteger.valueOf(123456779);
+
+ assertThat(tokenRange.rangeSize()).isEqualTo(expectedRangeSize);
+ }
+
+ @Test
+ public void testRangeSizeNegativeToPositive()
+ {
+ LongTokenRange tokenRange = new LongTokenRange(-500, 1500);
+ BigInteger expectedRangeSize = BigInteger.valueOf(2000);
+
+ assertThat(tokenRange.rangeSize()).isEqualTo(expectedRangeSize);
+ }
+
+ @Test
+ public void testRangeSizeWrapAround()
+ {
+ LongTokenRange tokenRange = new LongTokenRange(10, -10);
+ BigInteger expectedRangeSize = FULL_RANGE.subtract(BigInteger.valueOf(20));
+
+ assertThat(tokenRange.rangeSize()).isEqualTo(expectedRangeSize);
+ }
+
+ @Test
+ public void testNotCovering()
+ {
+ LongTokenRange range1 = new LongTokenRange(50, 500);
+ LongTokenRange range2 = new LongTokenRange(10, 40);
+
+ assertThat(range1.isCovering(range2)).isFalse();
+ }
+
+ @Test
+ public void testNotCoveringNegative()
+ {
+ LongTokenRange range1 = new LongTokenRange(-500, -50);
+ LongTokenRange range2 = new LongTokenRange(-40, -10);
+
+ assertThat(range1.isCovering(range2)).isFalse();
+ }
+
+ @Test
+ public void testNotCoveringOneWrapping()
+ {
+ LongTokenRange range1 = new LongTokenRange(50, -50);
+ LongTokenRange range2 = new LongTokenRange(10, 40);
+
+ assertThat(range1.isCovering(range2)).isFalse();
+ }
+
+ @Test
+ public void testNotCoveringBothWrapping()
+ {
+ LongTokenRange range1 = new LongTokenRange(50, -50);
+ LongTokenRange range2 = new LongTokenRange(10, -40);
+
+ assertThat(range1.isCovering(range2)).isFalse();
+ }
+
+ @Test
+ public void testCoveringSubRanges()
+ {
+ LongTokenRange range1 = new LongTokenRange(50, 500);
+ LongTokenRange range2 = new LongTokenRange(50, 200);
+ LongTokenRange range3 = new LongTokenRange(200, 350);
+ LongTokenRange range4 = new LongTokenRange(350, 500);
+
+ assertThat(range1.isCovering(range2)).isTrue();
+ assertThat(range1.isCovering(range3)).isTrue();
+ assertThat(range1.isCovering(range4)).isTrue();
+ }
+
+ @Test
+ public void testCoveringSubRangesNegative()
+ {
+ LongTokenRange range1 = new LongTokenRange(-500, -50);
+ LongTokenRange range2 = new LongTokenRange(-500, -350);
+ LongTokenRange range3 = new LongTokenRange(-350, -200);
+ LongTokenRange range4 = new LongTokenRange(-200, -50);
+
+ assertThat(range1.isCovering(range2)).isTrue();
+ assertThat(range1.isCovering(range3)).isTrue();
+ assertThat(range1.isCovering(range4)).isTrue();
+ }
+
+ @Test
+ public void testCoveringOneWrapping()
+ {
+ LongTokenRange range1 = new LongTokenRange(50, -50);
+ LongTokenRange range2 = new LongTokenRange(60, 70);
+
+ assertThat(range1.isCovering(range2)).isTrue();
+ }
+
+ @Test
+ public void testCoveringBothWrapping()
+ {
+ LongTokenRange range1 = new LongTokenRange(50, -50);
+ LongTokenRange range2 = new LongTokenRange(60, -60);
+
+ assertThat(range1.isCovering(range2)).isTrue();
+ }
+
+ @Test
+ public void testEqualsContract()
+ {
+ EqualsVerifier.forClass(LongTokenRange.class).usingGetClass().verify();
+ }
+}
diff --git a/pom.xml b/pom.xml
index 62a10baaa..df2ac8dcb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,6 +74,8 @@
connection.impl
application
data
+ core
+ core.impl