Skip to content

Commit

Permalink
Use same time for reading history and snapshot (#550)
Browse files Browse the repository at this point in the history
  • Loading branch information
masokol authored Aug 17, 2023
1 parent 1bca4be commit 43b9a39
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ public final void update()
{
RepairStateSnapshot oldRepairStateSnapshot = myRepairStateSnapshot.get();

long now = System.currentTimeMillis();
if (oldRepairStateSnapshot == null
|| oldRepairStateSnapshot.lastRepairedAt() < System.currentTimeMillis() - myRepairConfiguration.getRepairIntervalInMs())
|| oldRepairStateSnapshot.lastRepairedAt() < now - myRepairConfiguration.getRepairIntervalInMs())
{
RepairStateSnapshot newRepairStateSnapshot = generateNewRepairState(oldRepairStateSnapshot);
RepairStateSnapshot newRepairStateSnapshot = generateNewRepairState(oldRepairStateSnapshot, now);
if (myRepairStateSnapshot.compareAndSet(oldRepairStateSnapshot, newRepairStateSnapshot))
{
myTableRepairMetrics.lastRepairedAt(myTableReference, newRepairStateSnapshot.lastRepairedAt());
Expand All @@ -94,14 +95,14 @@ public RepairStateSnapshot getSnapshot()
return myRepairStateSnapshot.get();
}

private RepairStateSnapshot generateNewRepairState(RepairStateSnapshot old)
private RepairStateSnapshot generateNewRepairState(RepairStateSnapshot old, long now)
{
VnodeRepairStates vnodeRepairStates = myVnodeRepairStateFactory.calculateNewState(myTableReference, old);
VnodeRepairStates vnodeRepairStates = myVnodeRepairStateFactory.calculateNewState(myTableReference, old, now);

return generateSnapshotForVnode(vnodeRepairStates);
return generateSnapshotForVnode(vnodeRepairStates, now);
}

private RepairStateSnapshot generateSnapshotForVnode(VnodeRepairStates vnodeRepairStates)
private RepairStateSnapshot generateSnapshotForVnode(VnodeRepairStates vnodeRepairStates, long createdAt)
{
long repairedAt = calculateRepairedAt(vnodeRepairStates);

Expand All @@ -117,6 +118,7 @@ private RepairStateSnapshot generateSnapshotForVnode(VnodeRepairStates vnodeRepa
return RepairStateSnapshot.newBuilder()
.withLastRepairedAt(repairedAt)
.withVnodeRepairStates(updatedVnodeRepairStates)
.withCreatedAt(createdAt)
.withReplicaRepairGroups(replicaRepairGroups)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public interface VnodeRepairStateFactory
*
* @param tableReference The table to calculate the new repair state for vnodes.
* @param previous The previous repair state or null if non exists.
* @param iterateToTime The time to iterate repair entries to.
* @return The calculated repair state.
*/
VnodeRepairStates calculateNewState(TableReference tableReference, RepairStateSnapshot previous);
VnodeRepairStates calculateNewState(TableReference tableReference, RepairStateSnapshot previous, long iterateToTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,22 @@ public VnodeRepairStateFactoryImpl(ReplicationState replicationState, RepairHist
}

@Override
public VnodeRepairStates calculateNewState(TableReference tableReference, RepairStateSnapshot previous)
public VnodeRepairStates calculateNewState(TableReference tableReference, RepairStateSnapshot previous, long iterateToTime)
{
Map<LongTokenRange, ImmutableSet<Host>> tokenRangeToReplicaMap = myReplicationState.getTokenRangeToReplicas(tableReference);
long lastRepairedAt = previousLastRepairedAt(previous, tokenRangeToReplicaMap);
long now = System.currentTimeMillis();

Iterator<RepairEntry> repairEntryIterator;

if (lastRepairedAt == VnodeRepairState.UNREPAIRED)
{
LOG.debug("No last repaired at found for {}, iterating over all repair entries", tableReference);
repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, now, (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap));
repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, iterateToTime, (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap));
}
else
{
LOG.debug("Table {} snapshot created at {}, iterating repir entries until that time", tableReference, previous.getCreatedAt());
repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, now, previous.getCreatedAt(), (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap));
LOG.debug("Table {} snapshot created at {}, iterating repair entries until that time", tableReference, previous.getCreatedAt());
repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, iterateToTime, previous.getCreatedAt(), (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap));
}

return generateVnodeRepairStates(lastRepairedAt, previous, repairEntryIterator, tokenRangeToReplicaMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -82,7 +83,7 @@ public void testInitialEmptyState()
VnodeRepairStates vnodeRepairStates = VnodeRepairStates.newBuilder(Collections.singletonList(vnodeRepairState))
.build();

when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(RepairStateSnapshot.class))).thenReturn(vnodeRepairStates);
when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(RepairStateSnapshot.class), any(long.class))).thenReturn(vnodeRepairStates);
when(mockReplicaRepairGroupFactory.generateReplicaRepairGroups(repairGroupCaptor.capture())).thenReturn(Lists.emptyList());

RepairState repairState = new RepairStateImpl(tableReference, repairConfiguration,
Expand Down Expand Up @@ -116,7 +117,7 @@ public void testPartiallyRepaired()
VnodeRepairStates vnodeRepairStates = VnodeRepairStates.newBuilder(Arrays.asList(vnodeRepairState, repairedVnodeRepairState))
.build();

when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(RepairStateSnapshot.class))).thenReturn(vnodeRepairStates);
when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(RepairStateSnapshot.class), any(long.class))).thenReturn(vnodeRepairStates);
when(mockReplicaRepairGroupFactory.generateReplicaRepairGroups(repairGroupCaptor.capture())).thenReturn(Collections.singletonList(mockReplicaRepairGroup));

RepairState repairState = new RepairStateImpl(tableReference, repairConfiguration,
Expand Down Expand Up @@ -150,7 +151,7 @@ public void testUpdateRepaired()
VnodeRepairStates vnodeRepairStates = VnodeRepairStates.newBuilder(Collections.singletonList(vnodeRepairState))
.build();

when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(RepairStateSnapshot.class))).thenReturn(vnodeRepairStates);
when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(RepairStateSnapshot.class), any(long.class))).thenReturn(vnodeRepairStates);
when(mockReplicaRepairGroupFactory.generateReplicaRepairGroups(repairGroupCaptor.capture())).thenReturn(Lists.emptyList());

RepairState repairState = new RepairStateImpl(tableReference, repairConfiguration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void testEmptyHistoryNoPreviousIsUnrepaired() throws UnknownHostException

VnodeRepairStateFactory vnodeRepairStateFactory = new VnodeRepairStateFactoryImpl(mockReplicationState, repairHistoryProvider);

VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, null);
VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, null, System.currentTimeMillis());

assertThat(actualVnodeRepairStates).isEqualTo(expectedVnodeRepairStates);
}
Expand Down Expand Up @@ -109,7 +109,7 @@ public void testEmptyHistoryWithPreviousKeepsRepairedAt() throws UnknownHostExce

VnodeRepairStateFactory vnodeRepairStateFactory = new VnodeRepairStateFactoryImpl(mockReplicationState, repairHistoryProvider);

VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, previousRepairState);
VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, previousRepairState, System.currentTimeMillis());

assertThat(actualVnodeRepairStates).isEqualTo(expectedVnodeRepairStates);
}
Expand Down Expand Up @@ -145,7 +145,7 @@ public void testWithHistoryNoPreviousIsRepaired() throws UnknownHostException

VnodeRepairStateFactory vnodeRepairStateFactory = new VnodeRepairStateFactoryImpl(mockReplicationState, repairHistoryProvider);

VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, null);
VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, null, System.currentTimeMillis());

assertThat(actualVnodeRepairStates).isEqualTo(expectedVnodeRepairStates);
}
Expand Down Expand Up @@ -181,7 +181,7 @@ public void testWithHistoryNoPreviousIsPartiallyRepaired() throws UnknownHostExc

VnodeRepairStateFactory vnodeRepairStateFactory = new VnodeRepairStateFactoryImpl(mockReplicationState, repairHistoryProvider);

VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, null);
VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, null, System.currentTimeMillis());

assertThat(actualVnodeRepairStates).isEqualTo(expectedVnodeRepairStates);
}
Expand Down Expand Up @@ -217,7 +217,7 @@ public void testWithOldHistoryNoPreviousIsPartiallyRepaired() throws UnknownHost

VnodeRepairStateFactory vnodeRepairStateFactory = new VnodeRepairStateFactoryImpl(mockReplicationState, repairHistoryProvider);

VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, null);
VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, null, System.currentTimeMillis());

assertThat(actualVnodeRepairStates).isEqualTo(expectedVnodeRepairStates);
}
Expand Down Expand Up @@ -266,7 +266,7 @@ public void testWithHistoryAndPreviousAfterScaleOut() throws UnknownHostExceptio
.withReplicaRepairGroups(Collections.emptyList())
.build();

VnodeRepairStates actualVnodeRepairStatesAfter = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, repairStateSnapshot);
VnodeRepairStates actualVnodeRepairStatesAfter = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, repairStateSnapshot, System.currentTimeMillis());

assertThat(actualVnodeRepairStatesAfter).isEqualTo(expectedVnodeRepairStatesAfter);
}
Expand Down Expand Up @@ -307,7 +307,7 @@ public void testWithHistoryAndPreviousOnlyIteratesOverDiff() throws UnknownHostE
VnodeRepairStateFactory vnodeRepairStateFactory = new VnodeRepairStateFactoryImpl(mockReplicationState,
repairHistoryProvider);

VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, null);
VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, null, System.currentTimeMillis());

assertThat(actualVnodeRepairStates).isEqualTo(expectedVnodeRepairStates);

Expand All @@ -323,7 +323,7 @@ public void testWithHistoryAndPreviousOnlyIteratesOverDiff() throws UnknownHostE
when(repairHistoryProvider.iterate(eq(TABLE_REFERENCE), any(long.class), eq(firstSnapshotCreatedAt),
any(Predicate.class))).thenReturn(secondIterateRepairEntries.iterator());
VnodeRepairStates updatedVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE,
firstRepairStateSnapshot);
firstRepairStateSnapshot, System.currentTimeMillis());

assertThat(updatedVnodeRepairStates).isEqualTo(expectedVnodeRepairStates);

Expand All @@ -342,7 +342,7 @@ public void testWithHistoryAndPreviousOnlyIteratesOverDiff() throws UnknownHostE
when(repairHistoryProvider.iterate(eq(TABLE_REFERENCE), any(long.class), eq(secondSnapshotCreatedAt),
any(Predicate.class))).thenReturn(thirdIterateRepairEntries.iterator());
updatedVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE,
secondRepairStateSnapshot);
secondRepairStateSnapshot, System.currentTimeMillis());

expectedVnodeRepairStates = VnodeRepairStates.newBuilder(Arrays.asList(
new VnodeRepairState(longTokenRange1, replicas, updateRange1RepairedAt),
Expand Down

0 comments on commit 43b9a39

Please sign in to comment.