Skip to content

Commit

Permalink
Merge branch 'ecchronos-3.0' into ecchronos-4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
masokol committed Aug 17, 2023
2 parents 1d056c5 + a0cebf5 commit a9ab82a
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ public RepairStateImpl(final TableReference tableReference,
public final void update()
{
RepairStateSnapshot oldRepairStateSnapshot = myRepairStateSnapshot.get();
long now = System.currentTimeMillis();
if (oldRepairStateSnapshot == null
|| isRepairNeeded(oldRepairStateSnapshot.lastCompletedAt(),
oldRepairStateSnapshot.getEstimatedRepairTime(),
System.currentTimeMillis()))
now))
{
RepairStateSnapshot newRepairStateSnapshot = generateNewRepairState(oldRepairStateSnapshot);
RepairStateSnapshot newRepairStateSnapshot = generateNewRepairState(oldRepairStateSnapshot, now);
if (myRepairStateSnapshot.compareAndSet(oldRepairStateSnapshot, newRepairStateSnapshot))
{
myTableRepairMetrics.lastRepairedAt(myTableReference, newRepairStateSnapshot.lastCompletedAt());
Expand Down Expand Up @@ -114,15 +115,15 @@ public RepairStateSnapshot getSnapshot()
return myRepairStateSnapshot.get();
}

private RepairStateSnapshot generateNewRepairState(final RepairStateSnapshot old)
private RepairStateSnapshot generateNewRepairState(final RepairStateSnapshot old, final long now)
{
VnodeRepairStates vnodeRepairStates = myVnodeRepairStateFactory.calculateNewState(myTableReference, old);
VnodeRepairStates vnodeRepairStates = myVnodeRepairStateFactory.calculateNewState(myTableReference, old, now);

return generateSnapshotForVnode(vnodeRepairStates, old);
return generateSnapshotForVnode(vnodeRepairStates, old, now);
}

private RepairStateSnapshot generateSnapshotForVnode(final VnodeRepairStates vnodeRepairStates,
final RepairStateSnapshot old)
final RepairStateSnapshot old, final long createdAt)
{
long repairedAt = calculateRepairedAt(vnodeRepairStates, old);

Expand All @@ -139,6 +140,7 @@ private RepairStateSnapshot generateSnapshotForVnode(final VnodeRepairStates vno
return RepairStateSnapshot.newBuilder()
.withLastCompletedAt(repairedAt)
.withVnodeRepairStates(updatedVnodeRepairStates)
.withCreatedAt(createdAt)
.withReplicaRepairGroups(replicaRepairGroups)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ public interface VnodeRepairStateFactory
*
* @param tableReference The table to calculate the new repair state for vnodes.
* @param previous The previous repair state or null if non exists.
* @param iterateToTime The time to iterate repair entries to.
* @return The calculated repair state.
*/
VnodeRepairStates calculateNewState(TableReference tableReference, RepairStateSnapshot previous);
VnodeRepairStates calculateNewState(TableReference tableReference, RepairStateSnapshot previous,
long iterateToTime);

/**
* Calculate the repair state for a time window.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,27 @@ public VnodeRepairStateFactoryImpl(final ReplicationState replicationState,
* {@inheritDoc}
*/
@Override
public VnodeRepairStates calculateNewState(final TableReference tableReference, final RepairStateSnapshot previous)
public VnodeRepairStates calculateNewState(final TableReference tableReference, final RepairStateSnapshot previous,
final long iterateToTime)
{
Map<LongTokenRange, ImmutableSet<DriverNode>> tokenRangeToReplicaMap
= myReplicationState.getTokenRangeToReplicas(tableReference);
long lastRepairedAt = previousLastRepairedAt(previous, tokenRangeToReplicaMap);
long now = System.currentTimeMillis();

Iterator<RepairEntry> repairEntryIterator;

if (lastRepairedAt == VnodeRepairState.UNREPAIRED)
{
LOG.debug("No last repaired at found for {}, iterating over all repair entries", tableReference);
repairEntryIterator = myRepairHistoryProvider.iterate(tableReference,
now, (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap));
repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, iterateToTime,
(repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap));
}
else
{
LOG.debug("Table {} snapshot created at {}, iterating repir entries until that time", tableReference,
LOG.debug("Table {} snapshot created at {}, iterating repair entries until that time", tableReference,
previous.getCreatedAt());
repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, now, previous.getCreatedAt(),
(repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap));
repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, iterateToTime,
previous.getCreatedAt(), (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap));
}

return generateVnodeRepairStates(lastRepairedAt, previous, repairEntryIterator, tokenRangeToReplicaMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static com.ericsson.bss.cassandra.ecchronos.core.MockTableReferenceFactory.tableReference;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -88,7 +89,7 @@ public void testInitialEmptyState()
VnodeRepairStates vnodeRepairStates = VnodeRepairStatesImpl.newBuilder(Collections.singletonList(vnodeRepairState))
.build();

when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull())).thenReturn(vnodeRepairStates);
when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(), any(long.class))).thenReturn(vnodeRepairStates);
when(mockReplicaRepairGroupFactory.generateReplicaRepairGroups(repairGroupCaptor.capture())).thenReturn(Lists.emptyList());

RepairState repairState = new RepairStateImpl(tableReference, repairConfiguration,
Expand Down Expand Up @@ -124,7 +125,7 @@ public void testPartiallyRepaired()
VnodeRepairStates vnodeRepairStates = VnodeRepairStatesImpl.newBuilder(Arrays.asList(vnodeRepairState, repairedVnodeRepairState))
.build();

when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull())).thenReturn(vnodeRepairStates);
when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(), any(long.class))).thenReturn(vnodeRepairStates);
when(mockReplicaRepairGroupFactory.generateReplicaRepairGroups(repairGroupCaptor.capture())).thenReturn(Collections.singletonList(mockReplicaRepairGroup));

RepairState repairState = new RepairStateImpl(tableReference, repairConfiguration,
Expand Down Expand Up @@ -161,7 +162,7 @@ public void testUpdateRepaired()
VnodeRepairStates vnodeRepairStates = VnodeRepairStatesImpl.newBuilder(Collections.singletonList(vnodeRepairState))
.build();

when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull())).thenReturn(vnodeRepairStates);
when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(), any(long.class))).thenReturn(vnodeRepairStates);
when(mockReplicaRepairGroupFactory.generateReplicaRepairGroups(repairGroupCaptor.capture())).thenReturn(Lists.emptyList());

RepairState repairState = new RepairStateImpl(tableReference, repairConfiguration,
Expand All @@ -181,7 +182,7 @@ public void testUpdateRepaired()
reset(mockTableRepairMetrics);

// Perform update
when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), eq(repairStateSnapshot))).thenReturn(vnodeRepairStates);
when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), eq(repairStateSnapshot), any(long.class))).thenReturn(vnodeRepairStates);
repairState.update();

RepairStateSnapshot updatedRepairStateSnapshot = repairState.getSnapshot();
Expand Down Expand Up @@ -223,7 +224,7 @@ public void testIsRepairNeeded()
RepairConfiguration repairConfiguration = repairConfiguration(repairIntervalInMs);
VnodeRepairStates vnodeRepairStates = VnodeRepairStatesImpl.newBuilder(Collections.singletonList(vnodeRepairState))
.build();
when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull())).thenReturn(vnodeRepairStates);
when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(), any(long.class))).thenReturn(vnodeRepairStates);
when(mockReplicaRepairGroupFactory.generateReplicaRepairGroups(repairGroupCaptor.capture())).thenReturn(Lists.emptyList());
RepairStateImpl repairState = new RepairStateImpl(tableReference, repairConfiguration,
mockVnodeRepairStateFactory, mockHostStates,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ private void assertNewState(VnodeRepairStateFactory factory, RepairStateSnapshot

private void assertNewState(VnodeRepairStateFactory factory, RepairStateSnapshot previous, Class<? extends VnodeRepairStates> expectedClass, Collection<VnodeRepairState> expectedStates)
{
VnodeRepairStates newStates = factory.calculateNewState(TABLE_REFERENCE, previous);
VnodeRepairStates newStates = factory.calculateNewState(TABLE_REFERENCE, previous, System.currentTimeMillis());
assertThat(newStates).isInstanceOf(expectedClass);

Collection<VnodeRepairState> vnodeRepairStates = newStates.getVnodeRepairStates();
Expand Down

0 comments on commit a9ab82a

Please sign in to comment.