diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/RepairStateImpl.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/RepairStateImpl.java index 1b1e28381..89bbe70da 100644 --- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/RepairStateImpl.java +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/RepairStateImpl.java @@ -67,10 +67,11 @@ public final void update() { RepairStateSnapshot oldRepairStateSnapshot = myRepairStateSnapshot.get(); + long now = System.currentTimeMillis(); if (oldRepairStateSnapshot == null - || oldRepairStateSnapshot.lastCompletedAt() < System.currentTimeMillis() - myRepairConfiguration.getRepairIntervalInMs()) + || oldRepairStateSnapshot.lastCompletedAt() < now - myRepairConfiguration.getRepairIntervalInMs()) { - RepairStateSnapshot newRepairStateSnapshot = generateNewRepairState(oldRepairStateSnapshot); + RepairStateSnapshot newRepairStateSnapshot = generateNewRepairState(oldRepairStateSnapshot, now); if (myRepairStateSnapshot.compareAndSet(oldRepairStateSnapshot, newRepairStateSnapshot)) { myTableRepairMetrics.lastRepairedAt(myTableReference, newRepairStateSnapshot.lastCompletedAt()); @@ -98,14 +99,14 @@ public RepairStateSnapshot getSnapshot() return myRepairStateSnapshot.get(); } - private RepairStateSnapshot generateNewRepairState(RepairStateSnapshot old) + private RepairStateSnapshot generateNewRepairState(RepairStateSnapshot old, long now) { - VnodeRepairStates vnodeRepairStates = myVnodeRepairStateFactory.calculateNewState(myTableReference, old); + VnodeRepairStates vnodeRepairStates = myVnodeRepairStateFactory.calculateNewState(myTableReference, old, now); - return generateSnapshotForVnode(vnodeRepairStates); + return generateSnapshotForVnode(vnodeRepairStates, now); } - private RepairStateSnapshot generateSnapshotForVnode(VnodeRepairStates vnodeRepairStates) + private RepairStateSnapshot generateSnapshotForVnode(VnodeRepairStates vnodeRepairStates, long createdAt) { long repairedAt = calculateRepairedAt(vnodeRepairStates); @@ -121,6 +122,7 @@ private RepairStateSnapshot generateSnapshotForVnode(VnodeRepairStates vnodeRepa return RepairStateSnapshot.newBuilder() .withLastCompletedAt(repairedAt) .withVnodeRepairStates(updatedVnodeRepairStates) + .withCreatedAt(createdAt) .withReplicaRepairGroups(replicaRepairGroups) .build(); } diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/VnodeRepairStateFactory.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/VnodeRepairStateFactory.java index 5948af837..5c48719bb 100644 --- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/VnodeRepairStateFactory.java +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/VnodeRepairStateFactory.java @@ -28,7 +28,8 @@ public interface VnodeRepairStateFactory * * @param tableReference The table to calculate the new repair state for vnodes. * @param previous The previous repair state or null if non exists. + * @param iterateToTime The time to iterate repair entries to. * @return The calculated repair state. */ - VnodeRepairStates calculateNewState(TableReference tableReference, RepairStateSnapshot previous); + VnodeRepairStates calculateNewState(TableReference tableReference, RepairStateSnapshot previous, long iterateToTime); } diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/VnodeRepairStateFactoryImpl.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/VnodeRepairStateFactoryImpl.java index 75be3c82a..1d2a7d5ff 100644 --- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/VnodeRepairStateFactoryImpl.java +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/VnodeRepairStateFactoryImpl.java @@ -45,23 +45,22 @@ public VnodeRepairStateFactoryImpl(ReplicationState replicationState, RepairHist } @Override - public VnodeRepairStates calculateNewState(TableReference tableReference, RepairStateSnapshot previous) + public VnodeRepairStates calculateNewState(TableReference tableReference, RepairStateSnapshot previous, long iterateToTime) { Map> tokenRangeToReplicaMap = myReplicationState.getTokenRangeToReplicas(tableReference); long lastRepairedAt = previousLastRepairedAt(previous, tokenRangeToReplicaMap); - long now = System.currentTimeMillis(); Iterator repairEntryIterator; if (lastRepairedAt == VnodeRepairState.UNREPAIRED) { LOG.debug("No last repaired at found for {}, iterating over all repair entries", tableReference); - repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, now, (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap)); + repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, iterateToTime, (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap)); } else { - LOG.debug("Table {} snapshot created at {}, iterating repir entries until that time", tableReference, previous.getCreatedAt()); - repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, now, previous.getCreatedAt(), (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap)); + LOG.debug("Table {} snapshot created at {}, iterating repair entries until that time", tableReference, previous.getCreatedAt()); + repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, iterateToTime, previous.getCreatedAt(), (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap)); } return generateVnodeRepairStates(lastRepairedAt, previous, repairEntryIterator, tokenRangeToReplicaMap); diff --git a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestRepairStateImpl.java b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestRepairStateImpl.java index 787215199..11b0a11a4 100644 --- a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestRepairStateImpl.java +++ b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestRepairStateImpl.java @@ -16,6 +16,7 @@ import static com.ericsson.bss.cassandra.ecchronos.core.MockTableReferenceFactory.tableReference; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.mock; @@ -88,7 +89,7 @@ public void testInitialEmptyState() VnodeRepairStates vnodeRepairStates = VnodeRepairStatesImpl.newBuilder(Collections.singletonList(vnodeRepairState)) .build(); - when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull())).thenReturn(vnodeRepairStates); + when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(), any(long.class))).thenReturn(vnodeRepairStates); when(mockReplicaRepairGroupFactory.generateReplicaRepairGroups(repairGroupCaptor.capture())).thenReturn(Lists.emptyList()); RepairState repairState = new RepairStateImpl(tableReference, repairConfiguration, @@ -123,7 +124,7 @@ public void testPartiallyRepaired() VnodeRepairStates vnodeRepairStates = VnodeRepairStatesImpl.newBuilder(Arrays.asList(vnodeRepairState, repairedVnodeRepairState)) .build(); - when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull())).thenReturn(vnodeRepairStates); + when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(), any(long.class))).thenReturn(vnodeRepairStates); when(mockReplicaRepairGroupFactory.generateReplicaRepairGroups(repairGroupCaptor.capture())).thenReturn(Collections.singletonList(mockReplicaRepairGroup)); RepairState repairState = new RepairStateImpl(tableReference, repairConfiguration, @@ -158,7 +159,7 @@ public void testUpdateRepaired() VnodeRepairStates vnodeRepairStates = VnodeRepairStatesImpl.newBuilder(Collections.singletonList(vnodeRepairState)) .build(); - when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull())).thenReturn(vnodeRepairStates); + when(mockVnodeRepairStateFactory.calculateNewState(eq(tableReference), isNull(), any(long.class))).thenReturn(vnodeRepairStates); when(mockReplicaRepairGroupFactory.generateReplicaRepairGroups(repairGroupCaptor.capture())).thenReturn(Lists.emptyList()); RepairState repairState = new RepairStateImpl(tableReference, repairConfiguration, diff --git a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestVnodeRepairStateFactoryImpl.java b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestVnodeRepairStateFactoryImpl.java index 3a13a7344..c62b47d18 100644 --- a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestVnodeRepairStateFactoryImpl.java +++ b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestVnodeRepairStateFactoryImpl.java @@ -470,7 +470,7 @@ private void assertNewState(VnodeRepairStateFactory factory, RepairStateSnapshot private void assertNewState(VnodeRepairStateFactory factory, RepairStateSnapshot previous, Class expectedClass, Collection expectedStates) { - VnodeRepairStates newStates = factory.calculateNewState(TABLE_REFERENCE, previous); + VnodeRepairStates newStates = factory.calculateNewState(TABLE_REFERENCE, previous, System.currentTimeMillis()); assertThat(newStates).isInstanceOf(expectedClass); Collection vnodeRepairStates = newStates.getVnodeRepairStates();