From 80ac051e25171c0e9a572a94eea4bc3fad27c931 Mon Sep 17 00:00:00 2001 From: masokol <97948057+masokol@users.noreply.github.com> Date: Thu, 31 Aug 2023 09:37:42 +0200 Subject: [PATCH] Insert into repair history only on session finish (#566) Closes #565 --- CHANGES.md | 1 + .../core/repair/state/EccRepairHistory.java | 57 ++++-------- .../repair/state/TestEccRepairHistory.java | 92 ++++--------------- .../ecchronos/standalone/ITSchedules.java | 2 +- 4 files changed, 37 insertions(+), 115 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index e4e7eae60..c6657c593 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,7 @@ ## Version 5.0.0 (Not yet released) +* Insert into repair history only on session finish - Issue #565 * Use Caffeine caches instead of Guava - Issue #534 * Validate TLS config for JMX and CQL - Issue #529 * Add support for incremental repairs - Issue #31 diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/EccRepairHistory.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/EccRepairHistory.java index 4336bf658..68b46553e 100644 --- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/EccRepairHistory.java +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/EccRepairHistory.java @@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory; import java.time.Instant; -import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -77,8 +76,7 @@ public final class EccRepairHistory implements RepairHistory, RepairHistoryProvi private final PreparedStatement iterateStatement; - private final PreparedStatement initiateStatement; - private final PreparedStatement finishStatement; + private final PreparedStatement createStatement; private EccRepairHistory(final Builder builder) { @@ -95,7 +93,7 @@ private EccRepairHistory(final Builder builder) "Replication state must be set"); lookbackTimeInMs = builder.lookbackTimeInMs; - initiateStatement = session.prepare(QueryBuilder.insertInto(builder.keyspaceName, "repair_history") + createStatement = session.prepare(QueryBuilder.insertInto(builder.keyspaceName, "repair_history") .value(COLUMN_TABLE_ID, bindMarker()) .value(COLUMN_NODE_ID, bindMarker()) .value(COLUMN_REPAIR_ID, bindMarker()) @@ -105,16 +103,9 @@ private EccRepairHistory(final Builder builder) .value(COLUMN_RANGE_END, bindMarker()) .value(COLUMN_STATUS, bindMarker()) .value(COLUMN_STARTED_AT, bindMarker()) + .value(COLUMN_FINISHED_AT, bindMarker()) .build().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)); - finishStatement = session.prepare(QueryBuilder.update(builder.keyspaceName, "repair_history") - .setColumn(COLUMN_STATUS, bindMarker()) - .setColumn(COLUMN_FINISHED_AT, bindMarker()) - .whereColumn(COLUMN_TABLE_ID).isEqualTo(bindMarker()) - .whereColumn(COLUMN_NODE_ID).isEqualTo(bindMarker()) - .whereColumn(COLUMN_REPAIR_ID).isEqualTo(bindMarker()) - .build().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)); - iterateStatement = session.prepare(QueryBuilder.selectFrom(builder.keyspaceName, "repair_history") .columns(COLUMN_STARTED_AT, COLUMN_FINISHED_AT, COLUMN_STATUS, COLUMN_RANGE_BEGIN, COLUMN_RANGE_END) .whereColumn(COLUMN_TABLE_ID).isEqualTo(bindMarker()) @@ -295,6 +286,7 @@ class RepairSessionImpl implements RepairSession private final Set participants; private final AtomicReference sessionState = new AtomicReference<>(SessionState.NO_STATE); private final AtomicReference repairId = new AtomicReference<>(null); + private final AtomicReference startedAt = new AtomicReference<>(null); RepairSessionImpl(final UUID aTableId, final UUID aNodeId, @@ -320,19 +312,14 @@ UUID getId() @Override public void start() { - repairId.compareAndSet(null, Uuids.timeBased()); transitionTo(SessionState.STARTED); - String rangeBegin = Long.toString(range.start); - String rangeEnd = Long.toString(range.end); - Date startedAt = new Date(Uuids.unixTimestamp(repairId.get())); - - insertWithRetry(participant -> insertStart(rangeBegin, rangeEnd, startedAt, participant)); + startedAt.compareAndSet(null, Instant.now()); } /** * Transition to state DONE, as long as the previous status was STARTED. Set finished at to current timestamp. * - * @param repairStatus The previous status + * @param repairStatus The repair status */ @Override public void finish(final RepairStatus repairStatus) @@ -340,9 +327,11 @@ public void finish(final RepairStatus repairStatus) Preconditions.checkArgument(!RepairStatus.STARTED.equals(repairStatus), "Repair status must change from started"); transitionTo(SessionState.DONE); - Date finishedAt = new Date(System.currentTimeMillis()); - - insertWithRetry(participant -> insertFinish(repairStatus, finishedAt, participant)); + String rangeBegin = Long.toString(range.start); + String rangeEnd = Long.toString(range.end); + Instant finishedAt = Instant.now(); + repairId.compareAndSet(null, Uuids.timeBased()); + insertWithRetry(participant -> insertFinish(rangeBegin, rangeEnd, repairStatus, finishedAt, participant)); } private void insertWithRetry(final Function> insertFunction) @@ -386,26 +375,14 @@ private void insertWithRetry(final Function insertStart(final String rangeBegin, - final String rangeEnd, - final Date startedAt, - final UUID participant) - { - Statement statement = initiateStatement.bind(tableId, participant, repairId.get(), jobId, nodeId, - rangeBegin, - rangeEnd, RepairStatus.STARTED.toString(), startedAt.toInstant()); - return executeAsync(statement); - } - - private CompletionStage insertFinish(final RepairStatus repairStatus, - final Date finishedAt, + private CompletionStage insertFinish(final String rangeBegin, + final String rangeEnd, + final RepairStatus repairStatus, + final Instant finishedAt, final UUID participant) { - Statement statement = finishStatement.bind(repairStatus.toString(), - finishedAt.toInstant(), - tableId, - participant, - repairId.get()); + Statement statement = createStatement.bind(tableId, participant, repairId.get(), jobId, nodeId, rangeBegin, + rangeEnd, repairStatus.toString(), startedAt.get(), finishedAt); return executeAsync(statement); } diff --git a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestEccRepairHistory.java b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestEccRepairHistory.java index 42fa2cd27..47fb92aba 100644 --- a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestEccRepairHistory.java +++ b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/repair/state/TestEccRepairHistory.java @@ -152,9 +152,7 @@ public void testStartAndFinishSession() RepairHistory.RepairSession repairSession = repairHistory.newSession(tableReference, jobId, range, participants); - repairSession.start(); - assertCorrectStart(repairSession, jobId, range, participants); repairSession.finish(RepairStatus.SUCCESS); assertCorrectFinish(repairSession, jobId, range, participants); @@ -187,11 +185,7 @@ public void testStartAndFailSession() RepairHistory.RepairSession repairSession = repairHistory .newSession(tableReference, jobId, range, participants); repairSession.start(); - - assertCorrectStart(repairSession, jobId, range, participants); - repairSession.finish(RepairStatus.FAILED); - assertFailedFinish(repairSession, jobId, range, participants); } @@ -215,31 +209,19 @@ public void testInsertAndIterate() .newSession(tableReference, jobId, range, participants); repairSession.start(); - long to = System.currentTimeMillis(); - - // Assert that we have a started session - Iterator repairEntryIterator = repairHistoryProvider - .iterate(tableReference, to, from, Predicates.alwaysTrue()); - assertThat(repairEntryIterator.hasNext()).isTrue(); - RepairEntry repairEntry = repairEntryIterator.next(); - assertThat(repairEntry.getParticipants()).isEqualTo(participants); - assertThat(repairEntry.getStatus()).isEqualTo(RepairStatus.STARTED); - assertThat(repairEntry.getRange()).isEqualTo(range); - assertThat(repairEntry.getStartedAt()).isBetween(from, to); - assertThat(repairEntry.getFinishedAt()).isEqualTo(-1L); - assertThat(repairEntryIterator.hasNext()).isFalse(); - + long beforeFinish = System.currentTimeMillis(); repairSession.finish(RepairStatus.SUCCESS); + long afterFinish = System.currentTimeMillis(); // Assert that the session has finished - repairEntryIterator = repairHistoryProvider.iterate(tableReference, to, from, Predicates.alwaysTrue()); + Iterator repairEntryIterator = repairHistoryProvider.iterate(tableReference, afterFinish, beforeFinish, Predicates.alwaysTrue()); assertThat(repairEntryIterator.hasNext()).isTrue(); - repairEntry = repairEntryIterator.next(); + RepairEntry repairEntry = repairEntryIterator.next(); assertThat(repairEntry.getParticipants()).isEqualTo(participants); assertThat(repairEntry.getStatus()).isEqualTo(RepairStatus.SUCCESS); assertThat(repairEntry.getRange()).isEqualTo(range); - assertThat(repairEntry.getStartedAt()).isBetween(from, to); - assertThat(repairEntry.getFinishedAt()).isGreaterThanOrEqualTo(to); + assertThat(repairEntry.getStartedAt()).isBetween(beforeFinish, afterFinish); + assertThat(repairEntry.getFinishedAt()).isGreaterThanOrEqualTo(beforeFinish); assertThat(repairEntryIterator.hasNext()).isFalse(); } @@ -327,52 +309,34 @@ public void testInsertAndIterateClusterWide() .newSession(tableReference, jobId, range2, participants2); repairSession2.start(); - long to = System.currentTimeMillis(); - - // Assert that we have a started sessions - Iterator repairEntryIterator = repairHistoryProvider - .iterate(localId, tableReference, to, from, Predicates.alwaysTrue()); - assertThat(repairEntryIterator.hasNext()).isTrue(); - RepairEntry repairEntry = repairEntryIterator.next(); - assertThat(repairEntry.getParticipants()).isEqualTo(participants1); - assertThat(repairEntry.getStatus()).isEqualTo(RepairStatus.STARTED); - assertThat(repairEntry.getRange()).isEqualTo(range1); - assertThat(repairEntry.getStartedAt()).isBetween(from, to); - assertThat(repairEntry.getFinishedAt()).isEqualTo(-1L); - assertThat(repairEntryIterator.hasNext()).isFalse(); - - repairEntryIterator = repairHistoryProvider.iterate(remoteNodeId, tableReference, to, from, Predicates.alwaysTrue()); - assertThat(repairEntryIterator.hasNext()).isTrue(); - repairEntry = repairEntryIterator.next(); - assertThat(repairEntry.getParticipants()).isEqualTo(participants2); - assertThat(repairEntry.getStatus()).isEqualTo(RepairStatus.STARTED); - assertThat(repairEntry.getRange()).isEqualTo(range2); - assertThat(repairEntry.getStartedAt()).isBetween(from, to); - assertThat(repairEntry.getFinishedAt()).isEqualTo(-1L); - assertThat(repairEntryIterator.hasNext()).isFalse(); + long beforeFinish = System.currentTimeMillis(); repairSession1.finish(RepairStatus.SUCCESS); repairSession2.finish(RepairStatus.SUCCESS); + long afterFinish = System.currentTimeMillis(); + // Assert that the sessions has finished - repairEntryIterator = repairHistoryProvider.iterate(localId, tableReference, to, from, Predicates.alwaysTrue()); + Iterator repairEntryIterator = repairHistoryProvider.iterate(localId, tableReference, afterFinish, + beforeFinish, Predicates.alwaysTrue()); assertThat(repairEntryIterator.hasNext()).isTrue(); - repairEntry = repairEntryIterator.next(); + RepairEntry repairEntry = repairEntryIterator.next(); assertThat(repairEntry.getParticipants()).isEqualTo(participants1); assertThat(repairEntry.getStatus()).isEqualTo(RepairStatus.SUCCESS); assertThat(repairEntry.getRange()).isEqualTo(range1); - assertThat(repairEntry.getStartedAt()).isBetween(from, to); - assertThat(repairEntry.getFinishedAt()).isGreaterThanOrEqualTo(to); + assertThat(repairEntry.getStartedAt()).isBetween(from, afterFinish); + assertThat(repairEntry.getFinishedAt()).isGreaterThanOrEqualTo(beforeFinish); assertThat(repairEntryIterator.hasNext()).isFalse(); - repairEntryIterator = repairHistoryProvider.iterate(remoteNodeId, tableReference, to, from, Predicates.alwaysTrue()); + repairEntryIterator = repairHistoryProvider.iterate(remoteNodeId, tableReference, afterFinish, beforeFinish, + Predicates.alwaysTrue()); assertThat(repairEntryIterator.hasNext()).isTrue(); repairEntry = repairEntryIterator.next(); assertThat(repairEntry.getParticipants()).isEqualTo(participants2); assertThat(repairEntry.getStatus()).isEqualTo(RepairStatus.SUCCESS); assertThat(repairEntry.getRange()).isEqualTo(range2); - assertThat(repairEntry.getStartedAt()).isBetween(from, to); - assertThat(repairEntry.getFinishedAt()).isGreaterThanOrEqualTo(to); + assertThat(repairEntry.getStartedAt()).isBetween(from, afterFinish); + assertThat(repairEntry.getFinishedAt()).isGreaterThanOrEqualTo(beforeFinish); assertThat(repairEntryIterator.hasNext()).isFalse(); } @@ -389,7 +353,6 @@ public void testMultipleInvocationsThrowsException() participants); repairSession.start(); - assertCorrectStart(repairSession, jobId, range, participants); // We can only start a session once assertThatExceptionOfType(IllegalStateException.class).isThrownBy(repairSession::start); @@ -407,18 +370,6 @@ public void testMultipleInvocationsThrowsException() .isThrownBy(() -> repairSession.finish(RepairStatus.UNKNOWN)); } - private void assertCorrectStart(RepairHistory.RepairSession repairSession, UUID jobId, LongTokenRange range, - Set participants) - { - for (DriverNode node : participants) - { - UUID nodeId = node.getId(); - EccEntry expectedEntry = startedSession(nodeId, internalSession(repairSession).getId(), jobId, range); - EccEntry actualEntry = fromDb(nodeId, repairSession); - assertCorrectStartEntry(actualEntry, expectedEntry); - } - } - private void assertFailedFinish(RepairHistory.RepairSession repairSession, UUID jobId, LongTokenRange range, Set participants) { @@ -445,15 +396,8 @@ private void assertCorrectFinish(RepairHistory.RepairSession repairSession, UUID } } - private void assertCorrectStartEntry(EccEntry actual, EccEntry expected) - { - assertThat(actual).isEqualTo(expected); - assertThat(actual.startedAt).isEqualTo(Uuids.unixTimestamp(actual.repairId)); - } - private void assertCorrectEndEntry(EccEntry actual, EccEntry expected) { - assertCorrectStartEntry(actual, expected); assertThat(actual.finishedAt).isBetween(actual.startedAt, expected.finishedAt); } diff --git a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITSchedules.java b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITSchedules.java index 16ab26f92..8728f473a 100644 --- a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITSchedules.java +++ b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITSchedules.java @@ -603,7 +603,7 @@ private void injectRepairHistory(TableReference tableReference, long timestamp, statement = QueryBuilder.insertInto("ecchronos", "repair_history") .value("table_id", literal(tableReference.getId())) .value("node_id", literal(myLocalNode.getId())) - .value("repair_id", literal(Uuids.startOf(started_at))) + .value("repair_id", literal(Uuids.startOf(finished_at))) .value("job_id", literal(tableReference.getId())) .value("coordinator_id", literal(myLocalNode.getId())) .value("range_begin", literal(range_begin))