From 43b9a39ac3302e748e3060c156fa30b42d531bc6 Mon Sep 17 00:00:00 2001 From: masokol <97948057+masokol@users.noreply.github.com> Date: Thu, 17 Aug 2023 07:45:58 +0200 Subject: [PATCH] Use same time for reading history and snapshot (#550) --- .../core/repair/state/RepairStateImpl.java | 14 ++++++++------ .../repair/state/VnodeRepairStateFactory.java | 3 ++- .../state/VnodeRepairStateFactoryImpl.java | 9 ++++----- .../core/repair/state/TestRepairStateImpl.java | 7 ++++--- .../state/TestVnodeRepairStateFactoryImpl.java | 18 +++++++++--------- 5 files changed, 27 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/RepairStateImpl.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/RepairStateImpl.java index 6cecd6a3c..5eeb8a05f 100644 --- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/RepairStateImpl.java +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/RepairStateImpl.java @@ -64,10 +64,11 @@ public final void update() { RepairStateSnapshot oldRepairStateSnapshot = myRepairStateSnapshot.get(); + long now = System.currentTimeMillis(); if (oldRepairStateSnapshot == null - || oldRepairStateSnapshot.lastRepairedAt() < System.currentTimeMillis() - myRepairConfiguration.getRepairIntervalInMs()) + || oldRepairStateSnapshot.lastRepairedAt() < now - myRepairConfiguration.getRepairIntervalInMs()) { - RepairStateSnapshot newRepairStateSnapshot = generateNewRepairState(oldRepairStateSnapshot); + RepairStateSnapshot newRepairStateSnapshot = generateNewRepairState(oldRepairStateSnapshot, now); if (myRepairStateSnapshot.compareAndSet(oldRepairStateSnapshot, newRepairStateSnapshot)) { myTableRepairMetrics.lastRepairedAt(myTableReference, newRepairStateSnapshot.lastRepairedAt()); @@ -94,14 +95,14 @@ public RepairStateSnapshot getSnapshot() return myRepairStateSnapshot.get(); } - private RepairStateSnapshot generateNewRepairState(RepairStateSnapshot old) + private RepairStateSnapshot generateNewRepairState(RepairStateSnapshot old, long now) { - VnodeRepairStates vnodeRepairStates = myVnodeRepairStateFactory.calculateNewState(myTableReference, old); + VnodeRepairStates vnodeRepairStates = myVnodeRepairStateFactory.calculateNewState(myTableReference, old, now); - return generateSnapshotForVnode(vnodeRepairStates); + return generateSnapshotForVnode(vnodeRepairStates, now); } - private RepairStateSnapshot generateSnapshotForVnode(VnodeRepairStates vnodeRepairStates) + private RepairStateSnapshot generateSnapshotForVnode(VnodeRepairStates vnodeRepairStates, long createdAt) { long repairedAt = calculateRepairedAt(vnodeRepairStates); @@ -117,6 +118,7 @@ private RepairStateSnapshot generateSnapshotForVnode(VnodeRepairStates vnodeRepa return RepairStateSnapshot.newBuilder() .withLastRepairedAt(repairedAt) .withVnodeRepairStates(updatedVnodeRepairStates) + .withCreatedAt(createdAt) .withReplicaRepairGroups(replicaRepairGroups) .build(); } diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/VnodeRepairStateFactory.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/VnodeRepairStateFactory.java index 5948af837..5c48719bb 100644 --- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/VnodeRepairStateFactory.java +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/VnodeRepairStateFactory.java @@ -28,7 +28,8 @@ public interface VnodeRepairStateFactory * * @param tableReference The table to calculate the new repair state for vnodes. * @param previous The previous repair state or null if non exists. + * @param iterateToTime The time to iterate repair entries to. * @return The calculated repair state. */ - VnodeRepairStates calculateNewState(TableReference tableReference, RepairStateSnapshot previous); + VnodeRepairStates calculateNewState(TableReference tableReference, RepairStateSnapshot previous, long iterateToTime); } diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/VnodeRepairStateFactoryImpl.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/VnodeRepairStateFactoryImpl.java index ded716e70..b303e44be 100644 --- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/VnodeRepairStateFactoryImpl.java +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/VnodeRepairStateFactoryImpl.java @@ -46,23 +46,22 @@ public VnodeRepairStateFactoryImpl(ReplicationState replicationState, RepairHist } @Override - public VnodeRepairStates calculateNewState(TableReference tableReference, RepairStateSnapshot previous) + public VnodeRepairStates calculateNewState(TableReference tableReference, RepairStateSnapshot previous, long iterateToTime) { Map> tokenRangeToReplicaMap = myReplicationState.getTokenRangeToReplicas(tableReference); long lastRepairedAt = previousLastRepairedAt(previous, tokenRangeToReplicaMap); - long now = System.currentTimeMillis(); Iterator repairEntryIterator; if (lastRepairedAt == VnodeRepairState.UNREPAIRED) { LOG.debug("No last repaired at found for {}, iterating over all repair entries", tableReference); - repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, now, (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap)); + repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, iterateToTime, (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap)); } else { - LOG.debug("Table {} snapshot created at {}, iterating repir entries until that time", tableReference, previous.getCreatedAt()); - repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, now, previous.getCreatedAt(), (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap)); + LOG.debug("Table {} snapshot created at {}, iterating repair entries until that time", tableReference, previous.getCreatedAt()); + repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, iterateToTime, previous.getCreatedAt(), (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap)); } return generateVnodeRepairStates(lastRepairedAt, previous, repairEntryIterator, tokenRangeToReplicaMap); diff --git a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestRepairStateImpl.java b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestRepairStateImpl.java index b84a4fbad..c14808bd4 100644 --- a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestRepairStateImpl.java +++ b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestRepairStateImpl.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isNull; import static org.mockito.Mockito.mock; @@ -82,7 +83,7 @@ public void testInitialEmptyState() VnodeRepairStates vnodeRepairStates = VnodeRepairStates.newBuilder(Collections.singletonList(vnodeRepairState)) .build(); - when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(RepairStateSnapshot.class))).thenReturn(vnodeRepairStates); + when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(RepairStateSnapshot.class), any(long.class))).thenReturn(vnodeRepairStates); when(mockReplicaRepairGroupFactory.generateReplicaRepairGroups(repairGroupCaptor.capture())).thenReturn(Lists.emptyList()); RepairState repairState = new RepairStateImpl(tableReference, repairConfiguration, @@ -116,7 +117,7 @@ public void testPartiallyRepaired() VnodeRepairStates vnodeRepairStates = VnodeRepairStates.newBuilder(Arrays.asList(vnodeRepairState, repairedVnodeRepairState)) .build(); - when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(RepairStateSnapshot.class))).thenReturn(vnodeRepairStates); + when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(RepairStateSnapshot.class), any(long.class))).thenReturn(vnodeRepairStates); when(mockReplicaRepairGroupFactory.generateReplicaRepairGroups(repairGroupCaptor.capture())).thenReturn(Collections.singletonList(mockReplicaRepairGroup)); RepairState repairState = new RepairStateImpl(tableReference, repairConfiguration, @@ -150,7 +151,7 @@ public void testUpdateRepaired() VnodeRepairStates vnodeRepairStates = VnodeRepairStates.newBuilder(Collections.singletonList(vnodeRepairState)) .build(); - when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(RepairStateSnapshot.class))).thenReturn(vnodeRepairStates); + when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(RepairStateSnapshot.class), any(long.class))).thenReturn(vnodeRepairStates); when(mockReplicaRepairGroupFactory.generateReplicaRepairGroups(repairGroupCaptor.capture())).thenReturn(Lists.emptyList()); RepairState repairState = new RepairStateImpl(tableReference, repairConfiguration, diff --git a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestVnodeRepairStateFactoryImpl.java b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestVnodeRepairStateFactoryImpl.java index 8d1ad2515..7c51f6797 100644 --- a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestVnodeRepairStateFactoryImpl.java +++ b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestVnodeRepairStateFactoryImpl.java @@ -75,7 +75,7 @@ public void testEmptyHistoryNoPreviousIsUnrepaired() throws UnknownHostException VnodeRepairStateFactory vnodeRepairStateFactory = new VnodeRepairStateFactoryImpl(mockReplicationState, repairHistoryProvider); - VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, null); + VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, null, System.currentTimeMillis()); assertThat(actualVnodeRepairStates).isEqualTo(expectedVnodeRepairStates); } @@ -109,7 +109,7 @@ public void testEmptyHistoryWithPreviousKeepsRepairedAt() throws UnknownHostExce VnodeRepairStateFactory vnodeRepairStateFactory = new VnodeRepairStateFactoryImpl(mockReplicationState, repairHistoryProvider); - VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, previousRepairState); + VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, previousRepairState, System.currentTimeMillis()); assertThat(actualVnodeRepairStates).isEqualTo(expectedVnodeRepairStates); } @@ -145,7 +145,7 @@ public void testWithHistoryNoPreviousIsRepaired() throws UnknownHostException VnodeRepairStateFactory vnodeRepairStateFactory = new VnodeRepairStateFactoryImpl(mockReplicationState, repairHistoryProvider); - VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, null); + VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, null, System.currentTimeMillis()); assertThat(actualVnodeRepairStates).isEqualTo(expectedVnodeRepairStates); } @@ -181,7 +181,7 @@ public void testWithHistoryNoPreviousIsPartiallyRepaired() throws UnknownHostExc VnodeRepairStateFactory vnodeRepairStateFactory = new VnodeRepairStateFactoryImpl(mockReplicationState, repairHistoryProvider); - VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, null); + VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, null, System.currentTimeMillis()); assertThat(actualVnodeRepairStates).isEqualTo(expectedVnodeRepairStates); } @@ -217,7 +217,7 @@ public void testWithOldHistoryNoPreviousIsPartiallyRepaired() throws UnknownHost VnodeRepairStateFactory vnodeRepairStateFactory = new VnodeRepairStateFactoryImpl(mockReplicationState, repairHistoryProvider); - VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, null); + VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, null, System.currentTimeMillis()); assertThat(actualVnodeRepairStates).isEqualTo(expectedVnodeRepairStates); } @@ -266,7 +266,7 @@ public void testWithHistoryAndPreviousAfterScaleOut() throws UnknownHostExceptio .withReplicaRepairGroups(Collections.emptyList()) .build(); - VnodeRepairStates actualVnodeRepairStatesAfter = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, repairStateSnapshot); + VnodeRepairStates actualVnodeRepairStatesAfter = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, repairStateSnapshot, System.currentTimeMillis()); assertThat(actualVnodeRepairStatesAfter).isEqualTo(expectedVnodeRepairStatesAfter); } @@ -307,7 +307,7 @@ public void testWithHistoryAndPreviousOnlyIteratesOverDiff() throws UnknownHostE VnodeRepairStateFactory vnodeRepairStateFactory = new VnodeRepairStateFactoryImpl(mockReplicationState, repairHistoryProvider); - VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, null); + VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, null, System.currentTimeMillis()); assertThat(actualVnodeRepairStates).isEqualTo(expectedVnodeRepairStates); @@ -323,7 +323,7 @@ public void testWithHistoryAndPreviousOnlyIteratesOverDiff() throws UnknownHostE when(repairHistoryProvider.iterate(eq(TABLE_REFERENCE), any(long.class), eq(firstSnapshotCreatedAt), any(Predicate.class))).thenReturn(secondIterateRepairEntries.iterator()); VnodeRepairStates updatedVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, - firstRepairStateSnapshot); + firstRepairStateSnapshot, System.currentTimeMillis()); assertThat(updatedVnodeRepairStates).isEqualTo(expectedVnodeRepairStates); @@ -342,7 +342,7 @@ public void testWithHistoryAndPreviousOnlyIteratesOverDiff() throws UnknownHostE when(repairHistoryProvider.iterate(eq(TABLE_REFERENCE), any(long.class), eq(secondSnapshotCreatedAt), any(Predicate.class))).thenReturn(thirdIterateRepairEntries.iterator()); updatedVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, - secondRepairStateSnapshot); + secondRepairStateSnapshot, System.currentTimeMillis()); expectedVnodeRepairStates = VnodeRepairStates.newBuilder(Arrays.asList( new VnodeRepairState(longTokenRange1, replicas, updateRange1RepairedAt),