Skip to content

Commit

Permalink
Merge branch 'ecchronos-1.2' into ecchronos-2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
masokol committed Aug 17, 2023
2 parents 20bd788 + 0b83f7f commit ca17718
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ public final void update()
{
RepairStateSnapshot oldRepairStateSnapshot = myRepairStateSnapshot.get();

long now = System.currentTimeMillis();
if (oldRepairStateSnapshot == null
|| oldRepairStateSnapshot.lastCompletedAt() < System.currentTimeMillis() - myRepairConfiguration.getRepairIntervalInMs())
|| oldRepairStateSnapshot.lastCompletedAt() < now - myRepairConfiguration.getRepairIntervalInMs())
{
RepairStateSnapshot newRepairStateSnapshot = generateNewRepairState(oldRepairStateSnapshot);
RepairStateSnapshot newRepairStateSnapshot = generateNewRepairState(oldRepairStateSnapshot, now);
if (myRepairStateSnapshot.compareAndSet(oldRepairStateSnapshot, newRepairStateSnapshot))
{
myTableRepairMetrics.lastRepairedAt(myTableReference, newRepairStateSnapshot.lastCompletedAt());
Expand Down Expand Up @@ -98,14 +99,14 @@ public RepairStateSnapshot getSnapshot()
return myRepairStateSnapshot.get();
}

private RepairStateSnapshot generateNewRepairState(RepairStateSnapshot old)
private RepairStateSnapshot generateNewRepairState(RepairStateSnapshot old, long now)
{
VnodeRepairStates vnodeRepairStates = myVnodeRepairStateFactory.calculateNewState(myTableReference, old);
VnodeRepairStates vnodeRepairStates = myVnodeRepairStateFactory.calculateNewState(myTableReference, old, now);

return generateSnapshotForVnode(vnodeRepairStates);
return generateSnapshotForVnode(vnodeRepairStates, now);
}

private RepairStateSnapshot generateSnapshotForVnode(VnodeRepairStates vnodeRepairStates)
private RepairStateSnapshot generateSnapshotForVnode(VnodeRepairStates vnodeRepairStates, long createdAt)
{
long repairedAt = calculateRepairedAt(vnodeRepairStates);

Expand All @@ -121,6 +122,7 @@ private RepairStateSnapshot generateSnapshotForVnode(VnodeRepairStates vnodeRepa
return RepairStateSnapshot.newBuilder()
.withLastCompletedAt(repairedAt)
.withVnodeRepairStates(updatedVnodeRepairStates)
.withCreatedAt(createdAt)
.withReplicaRepairGroups(replicaRepairGroups)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public interface VnodeRepairStateFactory
*
* @param tableReference The table to calculate the new repair state for vnodes.
* @param previous The previous repair state or null if non exists.
* @param iterateToTime The time to iterate repair entries to.
* @return The calculated repair state.
*/
VnodeRepairStates calculateNewState(TableReference tableReference, RepairStateSnapshot previous);
VnodeRepairStates calculateNewState(TableReference tableReference, RepairStateSnapshot previous, long iterateToTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,22 @@ public VnodeRepairStateFactoryImpl(ReplicationState replicationState, RepairHist
}

@Override
public VnodeRepairStates calculateNewState(TableReference tableReference, RepairStateSnapshot previous)
public VnodeRepairStates calculateNewState(TableReference tableReference, RepairStateSnapshot previous, long iterateToTime)
{
Map<LongTokenRange, ImmutableSet<Node>> tokenRangeToReplicaMap = myReplicationState.getTokenRangeToReplicas(tableReference);
long lastRepairedAt = previousLastRepairedAt(previous, tokenRangeToReplicaMap);
long now = System.currentTimeMillis();

Iterator<RepairEntry> repairEntryIterator;

if (lastRepairedAt == VnodeRepairState.UNREPAIRED)
{
LOG.debug("No last repaired at found for {}, iterating over all repair entries", tableReference);
repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, now, (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap));
repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, iterateToTime, (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap));
}
else
{
LOG.debug("Table {} snapshot created at {}, iterating repir entries until that time", tableReference, previous.getCreatedAt());
repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, now, previous.getCreatedAt(), (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap));
LOG.debug("Table {} snapshot created at {}, iterating repair entries until that time", tableReference, previous.getCreatedAt());
repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, iterateToTime, previous.getCreatedAt(), (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap));
}

return generateVnodeRepairStates(lastRepairedAt, previous, repairEntryIterator, tokenRangeToReplicaMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static com.ericsson.bss.cassandra.ecchronos.core.MockTableReferenceFactory.tableReference;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -88,7 +89,7 @@ public void testInitialEmptyState()
VnodeRepairStates vnodeRepairStates = VnodeRepairStatesImpl.newBuilder(Collections.singletonList(vnodeRepairState))
.build();

when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull())).thenReturn(vnodeRepairStates);
when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(), any(long.class))).thenReturn(vnodeRepairStates);
when(mockReplicaRepairGroupFactory.generateReplicaRepairGroups(repairGroupCaptor.capture())).thenReturn(Lists.emptyList());

RepairState repairState = new RepairStateImpl(tableReference, repairConfiguration,
Expand Down Expand Up @@ -123,7 +124,7 @@ public void testPartiallyRepaired()
VnodeRepairStates vnodeRepairStates = VnodeRepairStatesImpl.newBuilder(Arrays.asList(vnodeRepairState, repairedVnodeRepairState))
.build();

when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull())).thenReturn(vnodeRepairStates);
when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(), any(long.class))).thenReturn(vnodeRepairStates);
when(mockReplicaRepairGroupFactory.generateReplicaRepairGroups(repairGroupCaptor.capture())).thenReturn(Collections.singletonList(mockReplicaRepairGroup));

RepairState repairState = new RepairStateImpl(tableReference, repairConfiguration,
Expand Down Expand Up @@ -158,7 +159,7 @@ public void testUpdateRepaired()
VnodeRepairStates vnodeRepairStates = VnodeRepairStatesImpl.newBuilder(Collections.singletonList(vnodeRepairState))
.build();

when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull())).thenReturn(vnodeRepairStates);
when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(), any(long.class))).thenReturn(vnodeRepairStates);
when(mockReplicaRepairGroupFactory.generateReplicaRepairGroups(repairGroupCaptor.capture())).thenReturn(Lists.emptyList());

RepairState repairState = new RepairStateImpl(tableReference, repairConfiguration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ private void assertNewState(VnodeRepairStateFactory factory, RepairStateSnapshot

private void assertNewState(VnodeRepairStateFactory factory, RepairStateSnapshot previous, Class<? extends VnodeRepairStates> expectedClass, Collection<VnodeRepairState> expectedStates)
{
VnodeRepairStates newStates = factory.calculateNewState(TABLE_REFERENCE, previous);
VnodeRepairStates newStates = factory.calculateNewState(TABLE_REFERENCE, previous, System.currentTimeMillis());
assertThat(newStates).isInstanceOf(expectedClass);

Collection<VnodeRepairState> vnodeRepairStates = newStates.getVnodeRepairStates();
Expand Down

0 comments on commit ca17718

Please sign in to comment.