diff --git a/CHANGES.md b/CHANGES.md index 79b9491bc..5aec95ae4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,7 @@ #### Merged from 1.0 +* Skip unnecessary reads from repair history - Issue #548 * Fix repair job priority - Issue #515 * Fix malformed IPv6 for JMX - Issue #306 @@ -41,6 +42,7 @@ ## Version 1.0.8 (Not yet released) +* Skip unnecessary reads from repair history - Issue #548 * Fix repair job priority - Issue #515 * Fix malformed IPv6 for JMX - Issue #306 * Step karaf to 4.2.8 diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/RepairStateSnapshot.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/RepairStateSnapshot.java index 8dfd1f801..952f34347 100644 --- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/RepairStateSnapshot.java +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/RepairStateSnapshot.java @@ -33,11 +33,13 @@ public class RepairStateSnapshot { private final boolean canRepair; private final long myLastRepairedAt; + private final long myCreatedAt; private final ImmutableList myReplicaRepairGroup; private final VnodeRepairStates myVnodeRepairStates; private RepairStateSnapshot(Builder builder) { + myCreatedAt = builder.myCreatedAt; myLastRepairedAt = builder.myLastRepairedAt; myReplicaRepairGroup = builder.myReplicaRepairGroup; myVnodeRepairStates = builder.myVnodeRepairStates; @@ -45,6 +47,15 @@ private RepairStateSnapshot(Builder builder) canRepair = !myReplicaRepairGroup.isEmpty(); } + /** + * Get the time this snapshot was created. + * @return The time this snapshot was created. + */ + public long getCreatedAt() + { + return myCreatedAt; + } + /** * Check if a repair can be performed based on the current state. * @@ -94,6 +105,7 @@ public static Builder newBuilder() public static class Builder { private Long myLastRepairedAt; + private long myCreatedAt = System.currentTimeMillis(); private ImmutableList myReplicaRepairGroup; private VnodeRepairStates myVnodeRepairStates; @@ -115,6 +127,12 @@ public Builder withVnodeRepairStates(VnodeRepairStates vnodeRepairStates) return this; } + public Builder withCreatedAt(long createdAt) + { + myCreatedAt = createdAt; + return this; + } + public RepairStateSnapshot build() { return new RepairStateSnapshot(this); diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/VnodeRepairStateFactoryImpl.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/VnodeRepairStateFactoryImpl.java index 951dbbbba..ded716e70 100644 --- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/VnodeRepairStateFactoryImpl.java +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/VnodeRepairStateFactoryImpl.java @@ -61,8 +61,8 @@ public VnodeRepairStates calculateNewState(TableReference tableReference, Repair } else { - LOG.debug("Table {} last repaired at {}, iterating repair entries until that time", tableReference, lastRepairedAt); - repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, now, lastRepairedAt, (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap)); + LOG.debug("Table {} snapshot created at {}, iterating repir entries until that time", tableReference, previous.getCreatedAt()); + repairEntryIterator = myRepairHistoryProvider.iterate(tableReference, now, previous.getCreatedAt(), (repairEntry) -> acceptRepairEntries(repairEntry, tokenRangeToReplicaMap)); } return generateVnodeRepairStates(lastRepairedAt, previous, repairEntryIterator, tokenRangeToReplicaMap); diff --git a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestVnodeRepairStateFactoryImpl.java b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestVnodeRepairStateFactoryImpl.java index 3c8d980d0..8d1ad2515 100644 --- a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestVnodeRepairStateFactoryImpl.java +++ b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestVnodeRepairStateFactoryImpl.java @@ -28,6 +28,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -37,8 +38,10 @@ import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) @@ -268,6 +271,86 @@ public void testWithHistoryAndPreviousAfterScaleOut() throws UnknownHostExceptio assertThat(actualVnodeRepairStatesAfter).isEqualTo(expectedVnodeRepairStatesAfter); } + @Test + public void testWithHistoryAndPreviousOnlyIteratesOverDiff() throws UnknownHostException + { + MockedHost host1 = new MockedHost("127.0.0.1"); + MockedHost host2 = new MockedHost("127.0.0.2"); + + LongTokenRange longTokenRange1 = new LongTokenRange(1, 2); + LongTokenRange longTokenRange2 = new LongTokenRange(2, 3); + ImmutableSet replicas = getReplicas(host1, host2); + + Map> tokenToHostMap = new HashMap<>(); + tokenToHostMap.put(longTokenRange1, replicas); + tokenToHostMap.put(longTokenRange2, replicas); + + long range1RepairedAt = 1; + long range2RepairedAt = 2; + ImmutableSet replicaAddresses = getReplicaAddresses(host1, host2); + RepairEntry repairEntry1 = new RepairEntry(longTokenRange1, range1RepairedAt, replicaAddresses, "SUCCESS"); + RepairEntry repairEntry2 = new RepairEntry(longTokenRange2, range2RepairedAt, replicaAddresses, "SUCCESS"); + List firstIterateRepairEntries = new ArrayList<>(); + firstIterateRepairEntries.add(repairEntry1); + firstIterateRepairEntries.add(repairEntry2); + + when(mockReplicationState.getTokenRangeToReplicas(eq(TABLE_REFERENCE))).thenReturn(tokenToHostMap); + RepairHistoryProvider repairHistoryProvider = mock(RepairHistoryProvider.class); + when(repairHistoryProvider.iterate(eq(TABLE_REFERENCE), any(long.class), any(Predicate.class))).thenReturn( + firstIterateRepairEntries.iterator()); + + VnodeRepairStates expectedVnodeRepairStates = VnodeRepairStates.newBuilder(Arrays.asList( + new VnodeRepairState(longTokenRange1, replicas, range1RepairedAt), + new VnodeRepairState(longTokenRange2, replicas, range2RepairedAt))) + .build(); + + VnodeRepairStateFactory vnodeRepairStateFactory = new VnodeRepairStateFactoryImpl(mockReplicationState, + repairHistoryProvider); + + VnodeRepairStates actualVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, null); + + assertThat(actualVnodeRepairStates).isEqualTo(expectedVnodeRepairStates); + + // Check that vnodes keep their states from old snapshot even if iterator is empty + long firstSnapshotCreatedAt = 3; + RepairStateSnapshot firstRepairStateSnapshot = RepairStateSnapshot.newBuilder() + .withVnodeRepairStates(actualVnodeRepairStates) + .withLastRepairedAt(range1RepairedAt) + .withReplicaRepairGroups(new ArrayList<>()) + .withCreatedAt(firstSnapshotCreatedAt).build(); + List secondIterateRepairEntries = new ArrayList<>(); + + when(repairHistoryProvider.iterate(eq(TABLE_REFERENCE), any(long.class), eq(firstSnapshotCreatedAt), + any(Predicate.class))).thenReturn(secondIterateRepairEntries.iterator()); + VnodeRepairStates updatedVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, + firstRepairStateSnapshot); + + assertThat(updatedVnodeRepairStates).isEqualTo(expectedVnodeRepairStates); + + // Check that vnodes get updated for the new repair entries and old are kept from old snapshot + long secondSnapshotCreatedAt = 5; + RepairStateSnapshot secondRepairStateSnapshot = RepairStateSnapshot.newBuilder() + .withVnodeRepairStates(actualVnodeRepairStates) + .withLastRepairedAt(range1RepairedAt) + .withReplicaRepairGroups(new ArrayList<>()) + .withCreatedAt(secondSnapshotCreatedAt).build(); + long updateRange1RepairedAt = 4; + RepairEntry repairEntry3 = new RepairEntry(longTokenRange1, updateRange1RepairedAt, replicaAddresses, "SUCCESS"); + List thirdIterateRepairEntries = new ArrayList<>(); + thirdIterateRepairEntries.add(repairEntry3); + + when(repairHistoryProvider.iterate(eq(TABLE_REFERENCE), any(long.class), eq(secondSnapshotCreatedAt), + any(Predicate.class))).thenReturn(thirdIterateRepairEntries.iterator()); + updatedVnodeRepairStates = vnodeRepairStateFactory.calculateNewState(TABLE_REFERENCE, + secondRepairStateSnapshot); + + expectedVnodeRepairStates = VnodeRepairStates.newBuilder(Arrays.asList( + new VnodeRepairState(longTokenRange1, replicas, updateRange1RepairedAt), + new VnodeRepairState(longTokenRange2, replicas, range2RepairedAt))) + .build(); + assertThat(updatedVnodeRepairStates).isEqualTo(expectedVnodeRepairStates); + } + private ImmutableSet getReplicas(MockedHost... hosts) { return ImmutableSet.copyOf(Lists.newArrayList(hosts).stream().map(MockedHost::getHost).collect(Collectors.toSet()));