Skip to content

Commit

Permalink
Insert into repair history only on session finish (#566)
Browse files Browse the repository at this point in the history
Closes #565
  • Loading branch information
masokol authored Aug 31, 2023
1 parent 0312713 commit 80ac051
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 115 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Version 5.0.0 (Not yet released)

* Insert into repair history only on session finish - Issue #565
* Use Caffeine caches instead of Guava - Issue #534
* Validate TLS config for JMX and CQL - Issue #529
* Add support for incremental repairs - Issue #31
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -77,8 +76,7 @@ public final class EccRepairHistory implements RepairHistory, RepairHistoryProvi

private final PreparedStatement iterateStatement;

private final PreparedStatement initiateStatement;
private final PreparedStatement finishStatement;
private final PreparedStatement createStatement;

private EccRepairHistory(final Builder builder)
{
Expand All @@ -95,7 +93,7 @@ private EccRepairHistory(final Builder builder)
"Replication state must be set");
lookbackTimeInMs = builder.lookbackTimeInMs;

initiateStatement = session.prepare(QueryBuilder.insertInto(builder.keyspaceName, "repair_history")
createStatement = session.prepare(QueryBuilder.insertInto(builder.keyspaceName, "repair_history")
.value(COLUMN_TABLE_ID, bindMarker())
.value(COLUMN_NODE_ID, bindMarker())
.value(COLUMN_REPAIR_ID, bindMarker())
Expand All @@ -105,16 +103,9 @@ private EccRepairHistory(final Builder builder)
.value(COLUMN_RANGE_END, bindMarker())
.value(COLUMN_STATUS, bindMarker())
.value(COLUMN_STARTED_AT, bindMarker())
.value(COLUMN_FINISHED_AT, bindMarker())
.build().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM));

finishStatement = session.prepare(QueryBuilder.update(builder.keyspaceName, "repair_history")
.setColumn(COLUMN_STATUS, bindMarker())
.setColumn(COLUMN_FINISHED_AT, bindMarker())
.whereColumn(COLUMN_TABLE_ID).isEqualTo(bindMarker())
.whereColumn(COLUMN_NODE_ID).isEqualTo(bindMarker())
.whereColumn(COLUMN_REPAIR_ID).isEqualTo(bindMarker())
.build().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM));

iterateStatement = session.prepare(QueryBuilder.selectFrom(builder.keyspaceName, "repair_history")
.columns(COLUMN_STARTED_AT, COLUMN_FINISHED_AT, COLUMN_STATUS, COLUMN_RANGE_BEGIN, COLUMN_RANGE_END)
.whereColumn(COLUMN_TABLE_ID).isEqualTo(bindMarker())
Expand Down Expand Up @@ -295,6 +286,7 @@ class RepairSessionImpl implements RepairSession
private final Set<UUID> participants;
private final AtomicReference<SessionState> sessionState = new AtomicReference<>(SessionState.NO_STATE);
private final AtomicReference<UUID> repairId = new AtomicReference<>(null);
private final AtomicReference<Instant> startedAt = new AtomicReference<>(null);

RepairSessionImpl(final UUID aTableId,
final UUID aNodeId,
Expand All @@ -320,29 +312,26 @@ UUID getId()
@Override
public void start()
{
repairId.compareAndSet(null, Uuids.timeBased());
transitionTo(SessionState.STARTED);
String rangeBegin = Long.toString(range.start);
String rangeEnd = Long.toString(range.end);
Date startedAt = new Date(Uuids.unixTimestamp(repairId.get()));

insertWithRetry(participant -> insertStart(rangeBegin, rangeEnd, startedAt, participant));
startedAt.compareAndSet(null, Instant.now());
}

/**
* Transition to state DONE, as long as the previous status was STARTED. Set finished at to current timestamp.
*
* @param repairStatus The previous status
* @param repairStatus The repair status
*/
@Override
public void finish(final RepairStatus repairStatus)
{
Preconditions.checkArgument(!RepairStatus.STARTED.equals(repairStatus),
"Repair status must change from started");
transitionTo(SessionState.DONE);
Date finishedAt = new Date(System.currentTimeMillis());

insertWithRetry(participant -> insertFinish(repairStatus, finishedAt, participant));
String rangeBegin = Long.toString(range.start);
String rangeEnd = Long.toString(range.end);
Instant finishedAt = Instant.now();
repairId.compareAndSet(null, Uuids.timeBased());
insertWithRetry(participant -> insertFinish(rangeBegin, rangeEnd, repairStatus, finishedAt, participant));
}

private void insertWithRetry(final Function<UUID, CompletionStage<AsyncResultSet>> insertFunction)
Expand Down Expand Up @@ -386,26 +375,14 @@ private void insertWithRetry(final Function<UUID, CompletionStage<AsyncResultSet
}
}

private CompletionStage<AsyncResultSet> insertStart(final String rangeBegin,
final String rangeEnd,
final Date startedAt,
final UUID participant)
{
Statement statement = initiateStatement.bind(tableId, participant, repairId.get(), jobId, nodeId,
rangeBegin,
rangeEnd, RepairStatus.STARTED.toString(), startedAt.toInstant());
return executeAsync(statement);
}

private CompletionStage<AsyncResultSet> insertFinish(final RepairStatus repairStatus,
final Date finishedAt,
private CompletionStage<AsyncResultSet> insertFinish(final String rangeBegin,
final String rangeEnd,
final RepairStatus repairStatus,
final Instant finishedAt,
final UUID participant)
{
Statement statement = finishStatement.bind(repairStatus.toString(),
finishedAt.toInstant(),
tableId,
participant,
repairId.get());
Statement statement = createStatement.bind(tableId, participant, repairId.get(), jobId, nodeId, rangeBegin,
rangeEnd, repairStatus.toString(), startedAt.get(), finishedAt);
return executeAsync(statement);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,7 @@ public void testStartAndFinishSession()

RepairHistory.RepairSession repairSession = repairHistory.newSession(tableReference, jobId, range,
participants);

repairSession.start();
assertCorrectStart(repairSession, jobId, range, participants);

repairSession.finish(RepairStatus.SUCCESS);
assertCorrectFinish(repairSession, jobId, range, participants);
Expand Down Expand Up @@ -187,11 +185,7 @@ public void testStartAndFailSession()
RepairHistory.RepairSession repairSession = repairHistory
.newSession(tableReference, jobId, range, participants);
repairSession.start();

assertCorrectStart(repairSession, jobId, range, participants);

repairSession.finish(RepairStatus.FAILED);

assertFailedFinish(repairSession, jobId, range, participants);
}

Expand All @@ -215,31 +209,19 @@ public void testInsertAndIterate()
.newSession(tableReference, jobId, range, participants);
repairSession.start();

long to = System.currentTimeMillis();

// Assert that we have a started session
Iterator<RepairEntry> repairEntryIterator = repairHistoryProvider
.iterate(tableReference, to, from, Predicates.alwaysTrue());
assertThat(repairEntryIterator.hasNext()).isTrue();
RepairEntry repairEntry = repairEntryIterator.next();
assertThat(repairEntry.getParticipants()).isEqualTo(participants);
assertThat(repairEntry.getStatus()).isEqualTo(RepairStatus.STARTED);
assertThat(repairEntry.getRange()).isEqualTo(range);
assertThat(repairEntry.getStartedAt()).isBetween(from, to);
assertThat(repairEntry.getFinishedAt()).isEqualTo(-1L);
assertThat(repairEntryIterator.hasNext()).isFalse();

long beforeFinish = System.currentTimeMillis();
repairSession.finish(RepairStatus.SUCCESS);
long afterFinish = System.currentTimeMillis();

// Assert that the session has finished
repairEntryIterator = repairHistoryProvider.iterate(tableReference, to, from, Predicates.alwaysTrue());
Iterator<RepairEntry> repairEntryIterator = repairHistoryProvider.iterate(tableReference, afterFinish, beforeFinish, Predicates.alwaysTrue());
assertThat(repairEntryIterator.hasNext()).isTrue();
repairEntry = repairEntryIterator.next();
RepairEntry repairEntry = repairEntryIterator.next();
assertThat(repairEntry.getParticipants()).isEqualTo(participants);
assertThat(repairEntry.getStatus()).isEqualTo(RepairStatus.SUCCESS);
assertThat(repairEntry.getRange()).isEqualTo(range);
assertThat(repairEntry.getStartedAt()).isBetween(from, to);
assertThat(repairEntry.getFinishedAt()).isGreaterThanOrEqualTo(to);
assertThat(repairEntry.getStartedAt()).isBetween(beforeFinish, afterFinish);
assertThat(repairEntry.getFinishedAt()).isGreaterThanOrEqualTo(beforeFinish);
assertThat(repairEntryIterator.hasNext()).isFalse();
}

Expand Down Expand Up @@ -327,52 +309,34 @@ public void testInsertAndIterateClusterWide()
.newSession(tableReference, jobId, range2, participants2);
repairSession2.start();

long to = System.currentTimeMillis();

// Assert that we have a started sessions
Iterator<RepairEntry> repairEntryIterator = repairHistoryProvider
.iterate(localId, tableReference, to, from, Predicates.alwaysTrue());
assertThat(repairEntryIterator.hasNext()).isTrue();
RepairEntry repairEntry = repairEntryIterator.next();
assertThat(repairEntry.getParticipants()).isEqualTo(participants1);
assertThat(repairEntry.getStatus()).isEqualTo(RepairStatus.STARTED);
assertThat(repairEntry.getRange()).isEqualTo(range1);
assertThat(repairEntry.getStartedAt()).isBetween(from, to);
assertThat(repairEntry.getFinishedAt()).isEqualTo(-1L);
assertThat(repairEntryIterator.hasNext()).isFalse();

repairEntryIterator = repairHistoryProvider.iterate(remoteNodeId, tableReference, to, from, Predicates.alwaysTrue());
assertThat(repairEntryIterator.hasNext()).isTrue();
repairEntry = repairEntryIterator.next();
assertThat(repairEntry.getParticipants()).isEqualTo(participants2);
assertThat(repairEntry.getStatus()).isEqualTo(RepairStatus.STARTED);
assertThat(repairEntry.getRange()).isEqualTo(range2);
assertThat(repairEntry.getStartedAt()).isBetween(from, to);
assertThat(repairEntry.getFinishedAt()).isEqualTo(-1L);
assertThat(repairEntryIterator.hasNext()).isFalse();
long beforeFinish = System.currentTimeMillis();

repairSession1.finish(RepairStatus.SUCCESS);
repairSession2.finish(RepairStatus.SUCCESS);

long afterFinish = System.currentTimeMillis();

// Assert that the sessions has finished
repairEntryIterator = repairHistoryProvider.iterate(localId, tableReference, to, from, Predicates.alwaysTrue());
Iterator<RepairEntry> repairEntryIterator = repairHistoryProvider.iterate(localId, tableReference, afterFinish,
beforeFinish, Predicates.alwaysTrue());
assertThat(repairEntryIterator.hasNext()).isTrue();
repairEntry = repairEntryIterator.next();
RepairEntry repairEntry = repairEntryIterator.next();
assertThat(repairEntry.getParticipants()).isEqualTo(participants1);
assertThat(repairEntry.getStatus()).isEqualTo(RepairStatus.SUCCESS);
assertThat(repairEntry.getRange()).isEqualTo(range1);
assertThat(repairEntry.getStartedAt()).isBetween(from, to);
assertThat(repairEntry.getFinishedAt()).isGreaterThanOrEqualTo(to);
assertThat(repairEntry.getStartedAt()).isBetween(from, afterFinish);
assertThat(repairEntry.getFinishedAt()).isGreaterThanOrEqualTo(beforeFinish);
assertThat(repairEntryIterator.hasNext()).isFalse();

repairEntryIterator = repairHistoryProvider.iterate(remoteNodeId, tableReference, to, from, Predicates.alwaysTrue());
repairEntryIterator = repairHistoryProvider.iterate(remoteNodeId, tableReference, afterFinish, beforeFinish,
Predicates.alwaysTrue());
assertThat(repairEntryIterator.hasNext()).isTrue();
repairEntry = repairEntryIterator.next();
assertThat(repairEntry.getParticipants()).isEqualTo(participants2);
assertThat(repairEntry.getStatus()).isEqualTo(RepairStatus.SUCCESS);
assertThat(repairEntry.getRange()).isEqualTo(range2);
assertThat(repairEntry.getStartedAt()).isBetween(from, to);
assertThat(repairEntry.getFinishedAt()).isGreaterThanOrEqualTo(to);
assertThat(repairEntry.getStartedAt()).isBetween(from, afterFinish);
assertThat(repairEntry.getFinishedAt()).isGreaterThanOrEqualTo(beforeFinish);
assertThat(repairEntryIterator.hasNext()).isFalse();
}

Expand All @@ -389,7 +353,6 @@ public void testMultipleInvocationsThrowsException()
participants);

repairSession.start();
assertCorrectStart(repairSession, jobId, range, participants);

// We can only start a session once
assertThatExceptionOfType(IllegalStateException.class).isThrownBy(repairSession::start);
Expand All @@ -407,18 +370,6 @@ public void testMultipleInvocationsThrowsException()
.isThrownBy(() -> repairSession.finish(RepairStatus.UNKNOWN));
}

private void assertCorrectStart(RepairHistory.RepairSession repairSession, UUID jobId, LongTokenRange range,
Set<DriverNode> participants)
{
for (DriverNode node : participants)
{
UUID nodeId = node.getId();
EccEntry expectedEntry = startedSession(nodeId, internalSession(repairSession).getId(), jobId, range);
EccEntry actualEntry = fromDb(nodeId, repairSession);
assertCorrectStartEntry(actualEntry, expectedEntry);
}
}

private void assertFailedFinish(RepairHistory.RepairSession repairSession, UUID jobId, LongTokenRange range,
Set<DriverNode> participants)
{
Expand All @@ -445,15 +396,8 @@ private void assertCorrectFinish(RepairHistory.RepairSession repairSession, UUID
}
}

private void assertCorrectStartEntry(EccEntry actual, EccEntry expected)
{
assertThat(actual).isEqualTo(expected);
assertThat(actual.startedAt).isEqualTo(Uuids.unixTimestamp(actual.repairId));
}

private void assertCorrectEndEntry(EccEntry actual, EccEntry expected)
{
assertCorrectStartEntry(actual, expected);
assertThat(actual.finishedAt).isBetween(actual.startedAt, expected.finishedAt);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ private void injectRepairHistory(TableReference tableReference, long timestamp,
statement = QueryBuilder.insertInto("ecchronos", "repair_history")
.value("table_id", literal(tableReference.getId()))
.value("node_id", literal(myLocalNode.getId()))
.value("repair_id", literal(Uuids.startOf(started_at)))
.value("repair_id", literal(Uuids.startOf(finished_at)))
.value("job_id", literal(tableReference.getId()))
.value("coordinator_id", literal(myLocalNode.getId()))
.value("range_begin", literal(range_begin))
Expand Down

0 comments on commit 80ac051

Please sign in to comment.