Skip to content

Commit

Permalink
Improve hang preventing task #544 (#625)
Browse files Browse the repository at this point in the history
  • Loading branch information
SajidRiaz138 authored Dec 18, 2023
1 parent 92724e9 commit 287f3c1
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 8 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## Version 5.0.1 (Not yet released)

* Improve hang preventing task - Issue #544

## Version 5.0.0 (Not yet released)

* Build Ecchronos with Java 11 - Issue 616
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,13 @@ public interface JmxProxy extends Closeable
* @return The repaired ratio or 0 if it cannot be determined.
*/
double getPercentRepaired(TableReference tableReference);

/**
* Retrieves the current operational status of the local Cassandra node via JMX.
* Returns a string indicating the node's state (e.g., "NORMAL", "JOINING", "LEAVING", "MOVING")
* or "Unknown" if the status is undeterminable.
*
* @return A string representing the node's status.
*/
String getNodeStatus();
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,20 @@ public double getPercentRepaired(final TableReference tableReference)
}
return 0.0;
}

@Override
public String getNodeStatus()
{
try
{
return (String) myMbeanServerConnection.getAttribute(myStorageServiceObject, "OperationMode");
}
catch (Exception e)
{
LOG.error("Unable to retrieve node status {}", e.getMessage());
return "Unknown";
}
}
}

public static Builder builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public abstract class RepairTask implements NotificationListener
{
private static final Logger LOG = LoggerFactory.getLogger(RepairTask.class);
private static final Pattern RANGE_PATTERN = Pattern.compile("\\((-?[0-9]+),(-?[0-9]+)\\]");
private static final long HANG_PREVENT_TIME_IN_MINUTES = 30;
private static final int HEALTH_CHECK_INTERVAL = 10;
private final ScheduledExecutorService myExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("HangPreventingTask-%d").build());
private final CountDownLatch myLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -259,7 +259,8 @@ private void rescheduleHangPrevention()
{
myHangPreventFuture.cancel(false);
}
myHangPreventFuture = myExecutor.schedule(new HangPreventingTask(), HANG_PREVENT_TIME_IN_MINUTES,
// Schedule the first check to happen after 10 minutes
myHangPreventFuture = myExecutor.schedule(new HangPreventingTask(), HEALTH_CHECK_INTERVAL,
TimeUnit.MINUTES);
}

Expand Down Expand Up @@ -364,21 +365,46 @@ public enum ProgressEventType

private class HangPreventingTask implements Runnable
{
private static final int MAX_CHECKS = 3;
private static final String NORMAL_STATUS = "NORMAL";
private int checkCount = 0;

@Override
public void run()
{
try (JmxProxy proxy = myJmxProxyFactory.connect())
{
proxy.forceTerminateAllRepairSessions();
if (checkCount < MAX_CHECKS)
{
String nodeStatus = proxy.getNodeStatus();
if (!NORMAL_STATUS.equals(nodeStatus))
{
LOG.error("Local Cassandra node is down, aborting repair task.");
myLastError = new ScheduledJobException("Local Cassandra node is down");
proxy.forceTerminateAllRepairSessions();
myLatch.countDown(); // Signal to abort the repair task
}
else
{
checkCount++;
myHangPreventFuture = myExecutor.schedule(this, HEALTH_CHECK_INTERVAL, TimeUnit.MINUTES);
}
}
else
{
// After 3 successful checks or 30 minutes if still task is running terminate all repair sessions
proxy.forceTerminateAllRepairSessions();
myLatch.countDown();
}
}
catch (IOException e)
{
LOG.error("Unable to prevent hanging repair task: {}", this, e);
LOG.error("Unable to check node status or prevent hanging repair task: {}", this, e);
}
myLatch.countDown();
}
}


@VisibleForTesting
final Set<LongTokenRange> getFailedRanges()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -254,13 +255,13 @@ public void addStorageServiceListener(NotificationListener listener)
@Override
public List<String> getLiveNodes()
{
throw new UnsupportedOperationException();
return Collections.emptyList();
}

@Override
public List<String> getUnreachableNodes()
{
throw new UnsupportedOperationException();
return Collections.emptyList();
}

@Override
Expand All @@ -286,7 +287,7 @@ public void removeStorageServiceListener(NotificationListener listener)
@Override
public long liveDiskSpaceUsed(TableReference tableReference)
{
throw new UnsupportedOperationException();
return 5;
}

@Override
Expand All @@ -301,6 +302,12 @@ public double getPercentRepaired(TableReference tableReference)
throw new UnsupportedOperationException();
}

@Override
public String getNodeStatus()
{
return "NORMAL";
}

@Override
public void close()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,12 @@ public double getPercentRepaired(TableReference tableReference)
throw new UnsupportedOperationException();
}

@Override
public String getNodeStatus()
{
return "NORMAL";
}

public void notify(Notification notification)
{
myListener.handleNotification(notification, null);
Expand Down

0 comments on commit 287f3c1

Please sign in to comment.