Skip to content

Commit

Permalink
Merge branch 'ecchronos-3.0' into ecchronos-4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
itskarlsson committed Jun 29, 2023
2 parents 452cf58 + 438e57f commit a009a6d
Show file tree
Hide file tree
Showing 15 changed files with 188 additions and 44 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
* Set name for all threads - Issue #459
* Bump apache-karaf from 4.3.6 to 4.3.8 (CVE-2022-40145)

### Merged from 1.0
* Fix repair job priority - Issue #515

## Version 4.0.2

* Add support for excluding metrics on tags - Issue #446
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.datastax.oss.driver.api.core.metadata.Node;
import com.ericsson.bss.cassandra.ecchronos.core.utils.DriverNode;
import com.ericsson.bss.cassandra.ecchronos.core.utils.logging.ThrottlingLogger;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -84,14 +85,20 @@ private void refreshNodeStatus()
{
synchronized (myRefreshLock)
{
if (shouldRefreshNodeStatus())
if (shouldRefreshNodeStatus() && !tryRefreshHostStates())
{
tryRefreshHostStates();
myHostStates.clear();
}
}
}
}

@VisibleForTesting
void resetLastRefresh()
{
myLastRefresh = -1;
}

private boolean shouldRefreshNodeStatus()
{
return myLastRefresh == -1 || myLastRefresh < (System.currentTimeMillis() - myRefreshIntervalInMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,21 +266,43 @@ public long getRunOffset()
@Override
public boolean runnable()
{
if (super.runnable())
try
{
try
{
myRepairState.update();
}
catch (Exception e)
{
LOG.warn("Unable to check repair history, {}", this, e);
}
myRepairState.update();
}
catch (Exception e)
{
LOG.warn("Unable to check repair history, {}", this, e);
}

return myRepairState.getSnapshot().canRepair() && super.runnable();
}

/**
* Calculate real priority based on available tasks.
* @return priority
*/
@Override
public final int getRealPriority()
{
RepairStateSnapshot repairStateSnapshot = myRepairState.getSnapshot();
int priority = -1;
if (repairStateSnapshot.canRepair())
{
long minRepairedAt = System.currentTimeMillis();
for (ReplicaRepairGroup replicaRepairGroup : repairStateSnapshot.getRepairGroups())
{
long replicaGroupCompletedAt = replicaRepairGroup.getLastCompletedAt();
if (replicaGroupCompletedAt < minRepairedAt)
{
minRepairedAt = replicaGroupCompletedAt;
}
}
priority = getRealPriority(minRepairedAt);
}
return priority;
}

/**
* String representation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
package com.ericsson.bss.cassandra.ecchronos.core.repair.state;

import java.util.Collection;

/**
* Utility class to determine collective repaired at information for {@link VnodeRepairStates}.
*
Expand Down Expand Up @@ -85,11 +87,16 @@ public String toString()
* @return RepairedAt
*/
public static RepairedAt generate(final VnodeRepairStates vnodeRepairStates)
{
return RepairedAt.generate(vnodeRepairStates.getVnodeRepairStates());
}

public static RepairedAt generate(final Collection<VnodeRepairState> vnodeRepairStates)
{
long minRepairedAt = Long.MAX_VALUE;
long maxRepairedAt = Long.MIN_VALUE;

for (VnodeRepairState vnodeRepairState : vnodeRepairStates.getVnodeRepairStates())
for (VnodeRepairState vnodeRepairState : vnodeRepairStates)
{
long repairedAt = vnodeRepairState.lastRepairedAt();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,21 @@ public class ReplicaRepairGroup implements Iterable<LongTokenRange>
{
private final ImmutableSet<DriverNode> myReplicas;
private final ImmutableList<LongTokenRange> myVnodes;
private final long myLastCompletedAt;

/**
* Constructor.
*
* @param replicas The nodes.
* @param vnodes The token ranges.
* @param lastCompletedAt last repair completed
*/
public ReplicaRepairGroup(final ImmutableSet<DriverNode> replicas, final ImmutableList<LongTokenRange> vnodes)
public ReplicaRepairGroup(final ImmutableSet<DriverNode> replicas, final ImmutableList<LongTokenRange> vnodes,
final long lastCompletedAt)
{
myReplicas = replicas;
myVnodes = vnodes;
myLastCompletedAt = lastCompletedAt;
}

/**
Expand All @@ -63,6 +67,16 @@ public Set<String> getDataCenters()
return myReplicas.stream().map(DriverNode::getDatacenter).collect(Collectors.toSet());
}

/**
* Get last completed at.
*
* @return Last completed at for this repair group.
*/
public long getLastCompletedAt()
{
return myLastCompletedAt;
}

/**
* Iterate.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,15 @@ public List<ReplicaRepairGroup> generateReplicaRepairGroups(final List<VnodeRepa

if (countedReplicaGroups.add(replicas))
{
List<LongTokenRange> commonVnodes = availableVnodeRepairStates.stream()
.filter(v -> v.getReplicas().equals(replicas))
List<VnodeRepairState> vnodesForReplicas = availableVnodeRepairStates.stream()
.filter(v -> v.getReplicas().equals(replicas)).collect(Collectors.toList());
RepairedAt repairedAt = RepairedAt.generate(vnodesForReplicas);
List<LongTokenRange> commonVnodes = vnodesForReplicas.stream()
.map(VnodeRepairState::getTokenRange)
.collect(Collectors.toList());

sortedRepairGroups.add(new ReplicaRepairGroup(replicas, ImmutableList.copyOf(commonVnodes)));
sortedRepairGroups.add(new ReplicaRepairGroup(replicas, ImmutableList.copyOf(commonVnodes),
repairedAt.getMinRepairedAt()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,16 @@ public Priority getPriority()
* @return The current priority or -1 if the job shouldn't run now.
* @see #getPriority()
*/
public final int getRealPriority()
public int getRealPriority()
{
return getRealPriority(getLastSuccessfulRun());
}

public final int getRealPriority(final long lastSuccessfulRun)
{
long now = System.currentTimeMillis();
long diff = now - (getLastSuccessfulRun() + myRunIntervalInMs - getRunOffset());

long diff = now - (lastSuccessfulRun + myRunIntervalInMs - getRunOffset());

if (diff < 0)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testIsInetAddressUp() throws UnknownHostException
}

@Test
public void testIsHostUp() throws UnknownHostException
public void testIsHostUp() throws IOException
{
InetSocketAddress expectedAddress = new InetSocketAddress(InetAddress.getLocalHost(), 9042);
Node expectedHost = mock(Node.class);
Expand All @@ -104,6 +104,10 @@ public void testIsHostUp() throws UnknownHostException
when(expectedHost.getBroadcastAddress()).thenReturn(Optional.of(expectedAddress));

assertThat(myHostStates.isUp(expectedHost)).isTrue();

myHostStates.resetLastRefresh();
when(myJmxProxyFactory.connect()).thenThrow(new IOException("Unittest"));
assertThat(myHostStates.isUp(expectedHost)).isFalse();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testMultipleDataCenterHosts()
private ReplicaRepairGroup generateReplicaRepairGroup(DriverNode... nodes)
{
LongTokenRange range = new LongTokenRange(1, 2);
return new ReplicaRepairGroup(ImmutableSet.copyOf(nodes), ImmutableList.of(range));
return new ReplicaRepairGroup(ImmutableSet.copyOf(nodes), ImmutableList.of(range), System.currentTimeMillis());
}

private DriverNode mockNode(String dataCenter, UUID nodeId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testMultipleDataCenters()
private ReplicaRepairGroup generateReplicaRepairGroup(DriverNode... nodes)
{
LongTokenRange range = new LongTokenRange(1, 2);
return new ReplicaRepairGroup(ImmutableSet.copyOf(nodes), ImmutableList.of(range));
return new ReplicaRepairGroup(ImmutableSet.copyOf(nodes), ImmutableList.of(range), System.currentTimeMillis());
}

private DriverNode mockNode(String dataCenter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void testGetLock() throws LockException
Map<String, String> metadata = new HashMap<>();
metadata.put("keyspace", keyspaceName);
metadata.put("table", tableName);
ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(ImmutableSet.of(), ImmutableList.of());
ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(ImmutableSet.of(), ImmutableList.of(), System.currentTimeMillis());
Set<RepairResource> repairResources = Sets.newHashSet(new RepairResource("DC1", "my-resource"));

doReturn(repairResources).when(myRepairResourceFactory).getRepairResources(eq(replicaRepairGroup));
Expand All @@ -148,7 +148,7 @@ public void testGetLockWithThrowingLockingStrategy() throws LockException
Map<String, String> metadata = new HashMap<>();
metadata.put("keyspace", keyspaceName);
metadata.put("table", tableName);
ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(ImmutableSet.of(), ImmutableList.of());
ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(ImmutableSet.of(), ImmutableList.of(), System.currentTimeMillis());
Set<RepairResource> repairResources = Sets.newHashSet(new RepairResource("DC1", "my-resource"));

doReturn(repairResources).when(myRepairResourceFactory).getRepairResources(eq(replicaRepairGroup));
Expand All @@ -171,7 +171,10 @@ public void testGetRepairTask()

ImmutableSet<DriverNode> nodes = ImmutableSet.of(node);

ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(nodes, ImmutableList.of(range));
Set<LongTokenRange> ranges = new HashSet<>();
ranges.add(range);

ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(nodes, ImmutableList.of(range), System.currentTimeMillis());

RepairGroup repairGroup = builderFor(replicaRepairGroup).build(priority);

Expand Down Expand Up @@ -205,7 +208,7 @@ public void testGetRepairTaskWithSubRange()

ImmutableSet<DriverNode> nodes = ImmutableSet.of(node);

ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(nodes, ImmutableList.of(vnode));
ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(nodes, ImmutableList.of(vnode), System.currentTimeMillis());

RepairGroup repairGroup = builderFor(replicaRepairGroup)
.withTokensPerRepair(tokensPerRange)
Expand Down Expand Up @@ -240,7 +243,7 @@ public void testGetPartialRepairTasks()
new LongTokenRange(2, 3),
new LongTokenRange(4, 5));

ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(ImmutableSet.of(node, node2), vnodes);
ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(ImmutableSet.of(node, node2), vnodes, System.currentTimeMillis());

RepairGroup repairGroup = builderFor(replicaRepairGroup).build(priority);

Expand Down Expand Up @@ -270,7 +273,7 @@ public void testExecuteAllTasksSuccessful() throws ScheduledJobException
DriverNode node = mockNode("DC1");
LongTokenRange range = new LongTokenRange(1, 2);
ImmutableSet<DriverNode> nodes = ImmutableSet.of(node);
ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(nodes, ImmutableList.of(range));
ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(nodes, ImmutableList.of(range), System.currentTimeMillis());

RepairGroup repairGroup = spy(builderFor(replicaRepairGroup).build(priority));
RepairTask repairTask1 = mock(RepairTask.class);
Expand All @@ -295,7 +298,7 @@ public void testExecuteAllTasksFailed() throws ScheduledJobException
DriverNode node = mockNode("DC1");
LongTokenRange range = new LongTokenRange(1, 2);
ImmutableSet<DriverNode> nodes = ImmutableSet.of(node);
ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(nodes, ImmutableList.of(range));
ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(nodes, ImmutableList.of(range), System.currentTimeMillis());

RepairGroup repairGroup = spy(builderFor(replicaRepairGroup).build(priority));
RepairTask repairTask1 = mock(RepairTask.class);
Expand All @@ -320,7 +323,7 @@ public void testExecuteSomeTasksFailed() throws ScheduledJobException
DriverNode node = mockNode("DC1");
LongTokenRange range = new LongTokenRange(1, 2);
ImmutableSet<DriverNode> nodes = ImmutableSet.of(node);
ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(nodes, ImmutableList.of(range));
ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(nodes, ImmutableList.of(range), System.currentTimeMillis());

RepairGroup repairGroup = spy(builderFor(replicaRepairGroup).build(priority));
RepairTask repairTask1 = mock(RepairTask.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ public void testExecute() throws Exception
Map<String, String> metadata = new HashMap<>();
metadata.put("keyspace", keyspaceName);
metadata.put("table", tableName);
ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(ImmutableSet.of(withNode("127.0.0.1")),
ImmutableList.of(range(1, 2)));
ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(ImmutableSet.of(withNode("127.0.0.1")), ImmutableList.of(range(1, 2)),
System.currentTimeMillis());
Set<RepairResource> repairResources = Sets.newHashSet(new RepairResource("DC1", "my-resource"));

when(mockJmxProxyFactory.connect()).thenReturn(new CustomJmxProxy((notificationListener, i) -> progressAndComplete(notificationListener, range(1, 2))));
Expand All @@ -144,8 +144,7 @@ public void testExecuteWithPolicyStoppingSecondTask() throws Exception
Map<String, String> metadata = new HashMap<>();
metadata.put("keyspace", keyspaceName);
metadata.put("table", tableName);
ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(ImmutableSet.of(withNode("127.0.0.1")),
ImmutableList.of(range(1, 2), range(2, 3)));
ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(ImmutableSet.of(withNode("127.0.0.1")), ImmutableList.of(range(1, 2), range(2, 3)), System.currentTimeMillis());
Set<RepairResource> repairResources = Sets.newHashSet(new RepairResource("DC1", "my-resource"));
final AtomicBoolean shouldRun = new AtomicBoolean(true);

Expand Down
Loading

0 comments on commit a009a6d

Please sign in to comment.