Skip to content

Commit

Permalink
Merge branch 'ecchronos-1.1' into ecchronos-1.2
Browse files Browse the repository at this point in the history
  • Loading branch information
masokol committed Aug 17, 2023
2 parents cb99479 + f117cc4 commit 0b83f7f
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ public final void update()
{
RepairStateSnapshot oldRepairStateSnapshot = myRepairStateSnapshot.get();

long now = System.currentTimeMillis();
if (oldRepairStateSnapshot == null
|| oldRepairStateSnapshot.lastRepairedAt() < System.currentTimeMillis() - myRepairConfiguration.getRepairIntervalInMs())
|| oldRepairStateSnapshot.lastRepairedAt() < now - myRepairConfiguration.getRepairIntervalInMs())
{
RepairStateSnapshot newRepairStateSnapshot = generateNewRepairState(oldRepairStateSnapshot);
RepairStateSnapshot newRepairStateSnapshot = generateNewRepairState(oldRepairStateSnapshot, now);
if (myRepairStateSnapshot.compareAndSet(oldRepairStateSnapshot, newRepairStateSnapshot))
{
myTableRepairMetrics.lastRepairedAt(myTableReference, newRepairStateSnapshot.lastRepairedAt());
Expand All @@ -94,14 +95,14 @@ public RepairStateSnapshot getSnapshot()
return myRepairStateSnapshot.get();
}

private RepairStateSnapshot generateNewRepairState(RepairStateSnapshot old)
private RepairStateSnapshot generateNewRepairState(RepairStateSnapshot old, long now)
{
VnodeRepairStates vnodeRepairStates = myVnodeRepairStateFactory.calculateNewState(myTableReference, old);
VnodeRepairStates vnodeRepairStates = myVnodeRepairStateFactory.calculateNewState(myTableReference, old, now);

return generateSnapshotForVnode(vnodeRepairStates);
return generateSnapshotForVnode(vnodeRepairStates, now);
}

private RepairStateSnapshot generateSnapshotForVnode(VnodeRepairStates vnodeRepairStates)
private RepairStateSnapshot generateSnapshotForVnode(VnodeRepairStates vnodeRepairStates, long createdAt)
{
long repairedAt = calculateRepairedAt(vnodeRepairStates);

Expand All @@ -117,6 +118,7 @@ private RepairStateSnapshot generateSnapshotForVnode(VnodeRepairStates vnodeRepa
return RepairStateSnapshot.newBuilder()
.withLastRepairedAt(repairedAt)
.withVnodeRepairStates(updatedVnodeRepairStates)
.withCreatedAt(createdAt)
.withReplicaRepairGroups(replicaRepairGroups)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public interface VnodeRepairStateFactory
*
* @param tableReference The table to calculate the new repair state for vnodes.
* @param previous The previous repair state or null if non exists.
* @param iterateToTime The time to iterate repair entries to.
* @return The calculated repair state.
*/
VnodeRepairStates calculateNewState(TableReference tableReference, RepairStateSnapshot previous);
VnodeRepairStates calculateNewState(TableReference tableReference, RepairStateSnapshot previous, long iterateToTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,22 @@ public VnodeRepairStateFactoryImpl(ReplicationState replicationState, RepairHist
}

@Override
public VnodeRepairStates calculateNewState(TableReference tableReference, RepairStateSnapshot previous)
public VnodeRepairStates calculateNewState(TableReference tableReference, RepairStateSnapshot previous, long iterateToTime)
{
Map<LongTokenRange, ImmutableSet<Host>> tokenRangeToReplicaMap = myReplicationState.getTokenRangeToReplicas(tableReference);
long lastRepairedAt = previousLastRepairedAt(previous, tokenRangeToReplicaMap);
long now = System.currentTimeMillis();

Iterator<RepairEntry> repairEntryIterator;

if (lastRepairedAt == VnodeRepairState.UNREPAIRED)
{
LOG.debug("No last repaired at found for {}, iterating over all repair entries", tableReference);
repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, now, (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap));
repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, iterateToTime, (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap));
}
else
{
LOG.debug("Table {} snapshot created at {}, iterating repir entries until that time", tableReference, previous.getCreatedAt());
repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, now, previous.getCreatedAt(), (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap));
LOG.debug("Table {} snapshot created at {}, iterating repair entries until that time", tableReference, previous.getCreatedAt());
repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, iterateToTime, previous.getCreatedAt(), (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap));
}

return generateVnodeRepairStates(lastRepairedAt, previous, repairEntryIterator, tokenRangeToReplicaMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -82,7 +83,7 @@ public void testInitialEmptyState()
VnodeRepairStates vnodeRepairStates = VnodeRepairStatesImpl.newBuilder(Collections.singletonList(vnodeRepairState))
.build();

when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(RepairStateSnapshot.class))).thenReturn(vnodeRepairStates);
when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(RepairStateSnapshot.class), any(long.class))).thenReturn(vnodeRepairStates);
when(mockReplicaRepairGroupFactory.generateReplicaRepairGroups(repairGroupCaptor.capture())).thenReturn(Lists.emptyList());

RepairState repairState = new RepairStateImpl(tableReference, repairConfiguration,
Expand Down Expand Up @@ -116,7 +117,7 @@ public void testPartiallyRepaired()
VnodeRepairStates vnodeRepairStates = VnodeRepairStatesImpl.newBuilder(Arrays.asList(vnodeRepairState, repairedVnodeRepairState))
.build();

when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(RepairStateSnapshot.class))).thenReturn(vnodeRepairStates);
when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(RepairStateSnapshot.class), any(long.class))).thenReturn(vnodeRepairStates);
when(mockReplicaRepairGroupFactory.generateReplicaRepairGroups(repairGroupCaptor.capture())).thenReturn(Collections.singletonList(mockReplicaRepairGroup));

RepairState repairState = new RepairStateImpl(tableReference, repairConfiguration,
Expand Down Expand Up @@ -150,7 +151,7 @@ public void testUpdateRepaired()
VnodeRepairStates vnodeRepairStates = VnodeRepairStatesImpl.newBuilder(Collections.singletonList(vnodeRepairState))
.build();

when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(RepairStateSnapshot.class))).thenReturn(vnodeRepairStates);
when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(RepairStateSnapshot.class), any(long.class))).thenReturn(vnodeRepairStates);
when(mockReplicaRepairGroupFactory.generateReplicaRepairGroups(repairGroupCaptor.capture())).thenReturn(Lists.emptyList());

RepairState repairState = new RepairStateImpl(tableReference, repairConfiguration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ private void assertNewState(VnodeRepairStateFactory factory, RepairStateSnapshot

private void assertNewState(VnodeRepairStateFactory factory, RepairStateSnapshot previous, Class<? extends VnodeRepairStates> expectedClass, Collection<VnodeRepairState> expectedStates)
{
VnodeRepairStates newStates = factory.calculateNewState(TABLE_REFERENCE, previous);
VnodeRepairStates newStates = factory.calculateNewState(TABLE_REFERENCE, previous, System.currentTimeMillis());
assertThat(newStates).isInstanceOf(expectedClass);

Collection<VnodeRepairState> vnodeRepairStates = newStates.getVnodeRepairStates();
Expand Down

0 comments on commit 0b83f7f

Please sign in to comment.