Skip to content

Commit

Permalink
Fix PMD Violations and Rebase Change
Browse files Browse the repository at this point in the history
  • Loading branch information
VictorCavichioli committed Oct 13, 2024
1 parent 32f5f93 commit 86220d3
Show file tree
Hide file tree
Showing 15 changed files with 294 additions and 39 deletions.
9 changes: 8 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@

## Version 1.0.0 (Not yet Released)

* Generate Unique EcChronos ID #678
* Investigate Creation of RepairScheduler and ScheduleManager #714
* Implement ScheduledJobQueue for Prioritized Job Management and Execution - Issue #740
* Implement RepairGroup Class for Managing and Executing Repair Tasks - Issue #738
* Create IncrementalRepairTask Class - Issue #736
* Implement ScheduledRepairJob, ScheduledJob and ScheduledTask for Automated Recurring Task Scheduling in Cassandra - Issue #737
* Create RepairTask Abstract Class to Handle Repair Operations - Issue #717
* Create ReplicationState and ReplicationStateImpl Class for Managing Token-to-Replicas Mapping - Issue #722
* Generate Unique EcChronos ID - Issue #678
* Create RepairConfiguration class for repair configurations - Issue #716
* Create DistributedJmxProxy and DistributedJmxProxyFactory - Issue #715
* Create a New Maven Module "utils" for Common Code Reuse - Issue #720
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private Map<UUID, Node> generateNodesMap(final List<Node> nodes)
return nodesMap;
}

private static class NoOpRepairMetrics implements TableRepairMetrics
private static final class NoOpRepairMetrics implements TableRepairMetrics
{

@Override
Expand Down
4 changes: 2 additions & 2 deletions application/src/main/resources/ecc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ connection:
## Initial contact points list for ecChronos
## to establish first connection with Cassandra.
contactPoints:
- host: 172.18.0.2
- host: 127.0.0.1
port: 9042
- host: 172.18.0.5
- host: 127.0.0.2
port: 9042
## Configuration to define datacenters for ecchronos
## to connect to, datacenterAware enable means that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,53 @@

import java.util.concurrent.atomic.AtomicLong;

/**
* A utility class for logging messages with throttling capabilities.
* <p>
* This class ensures that a specific log message is only logged at specified intervals,
* preventing log flooding for high-frequency events.
* </p>
*/
public class ThrottledLogMessage
{
private final String myMessage;
private final long myIntervalNanos;
private final AtomicLong myLastLogTime;

/**
* Constructs a ThrottledLogMessage with the specified message and interval.
*
* @param message the log message to be throttled. Must not be {@code null}.
* @param intervalNanos the minimum interval (in nanoseconds) between consecutive log messages. Must be greater than zero.
* @throws IllegalArgumentException if {@code message} is {@code null} or {@code intervalNanos} is less than or equal to zero.
*/
public ThrottledLogMessage(final String message, final long intervalNanos)
{
myMessage = message;
myIntervalNanos = intervalNanos;
myLastLogTime = new AtomicLong(Long.MIN_VALUE);
}

/**
* Checks whether the logging of the message is allowed based on the specified time.
*
* @param timeInNanos the current time in nanoseconds.
* @return {@code true} if the message can be logged; {@code false} otherwise.
*/
private boolean isAllowedToLog(final long timeInNanos)
{
long lastLogTime = myLastLogTime.get();
return timeInNanos >= lastLogTime && myLastLogTime.compareAndSet(lastLogTime, timeInNanos + myIntervalNanos);
}

/**
* Logs an informational message if the logging is allowed based on the throttling interval.
*
* @param logger the logger to log the message to. Must not be {@code null}.
* @param timeInMs the current time in milliseconds.
* @param objects optional parameters to be included in the log message.
* @throws NullPointerException if {@code logger} is {@code null}.
*/
public final void info(final Logger logger, final long timeInMs, final Object... objects)
{
if (isAllowedToLog(timeInMs))
Expand All @@ -45,6 +73,14 @@ public final void info(final Logger logger, final long timeInMs, final Object...
}
}

/**
* Logs a warning message if the logging is allowed based on the throttling interval.
*
* @param logger the logger to log the message to. Must not be {@code null}.
* @param timeInMs the current time in milliseconds.
* @param objects optional parameters to be included in the log message.
* @throws NullPointerException if {@code logger} is {@code null}.
*/
public final void warn(final Logger logger, final long timeInMs, final Object... objects)
{
if (isAllowedToLog(timeInMs))
Expand All @@ -53,6 +89,14 @@ public final void warn(final Logger logger, final long timeInMs, final Object...
}
}

/**
* Logs an error message if the logging is allowed based on the throttling interval.
*
* @param logger the logger to log the message to. Must not be {@code null}.
* @param timeInMs the current time in milliseconds.
* @param objects optional parameters to be included in the log message.
* @throws NullPointerException if {@code logger} is {@code null}.
*/
public final void error(final Logger logger, final long timeInMs, final Object... objects)
{
if (isAllowedToLog(timeInMs))
Expand All @@ -62,3 +106,4 @@ public final void error(final Logger logger, final long timeInMs, final Object..
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
/**
* Logger that throttles log messages per interval.
* A log message uniqueness is based on the string message.
*
* This logger is thread safe.
*/
public class ThrottlingLogger
Expand All @@ -32,24 +31,50 @@ public class ThrottlingLogger
private final Logger myLogger;
private final long myIntervalNanos;

/**
* Constructs a ThrottlingLogger with the specified logger, interval, and time unit.
*
* @param logger the logger to which the messages will be sent. Must not be {@code null}.
* @param interval the interval duration for throttling messages.
* @param timeUnit the time unit for the interval duration. Must not be {@code null}.
* @throws NullPointerException if {@code logger} or {@code timeUnit} is {@code null}.
*/
public ThrottlingLogger(final Logger logger, final long interval, final TimeUnit timeUnit)
{
myLogger = logger;
myIntervalNanos = timeUnit.toNanos(interval);
}

/**
* Logs an informational message, throttled according to the specified interval.
*
* @param message the message to log. Must not be {@code null}.
* @param objects optional parameters to include in the log message.
*/
public final void info(final String message, final Object... objects)
{
ThrottledLogMessage throttledLogMessage = getThrottledLogMessage(message);
throttledLogMessage.info(myLogger, System.nanoTime(), objects);
}

/**
* Logs a warning message, throttled according to the specified interval.
*
* @param message the message to log. Must not be {@code null}.
* @param objects optional parameters to include in the log message.
*/
public final void warn(final String message, final Object... objects)
{
ThrottledLogMessage throttledLogMessage = getThrottledLogMessage(message);
throttledLogMessage.warn(myLogger, System.nanoTime(), objects);
}

/**
* Logs an error message, throttled according to the specified interval.
*
* @param message the message to log. Must not be {@code null}.
* @param objects optional parameters to include in the log message.
*/
public final void error(final String message, final Object... objects)
{
ThrottledLogMessage throttledLogMessage = getThrottledLogMessage(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,24 @@ public class CassandraMetrics implements Closeable
private final LoadingCache<MetricsKey, CassandraMetric> myCache;
private final DistributedJmxProxyFactory myJmxProxyFactory;

/**
* Constructs a CassandraMetrics instance with default cache refresh and expiry times.
*
* @param jmxProxyFactory the factory used to create connections to distributed JMX proxies. Must not be {@code null}.
*/
public CassandraMetrics(final DistributedJmxProxyFactory jmxProxyFactory)
{
this(jmxProxyFactory, Duration.ofSeconds(DEFAULT_CACHE_REFRESH_TIME_SECONDS),
Duration.ofMinutes(DEFAULT_CACHE_EXPIRY_TIME_MINUTES));
}

/**
* Constructs a CassandraMetrics instance.
*
* @param jmxProxyFactory the factory used to create connections to distributed JMX proxies. Must not be {@code null}.
* @param refreshAfter the duration after which the cache will refresh its entries. Must not be {@code null}.
* @param expireAfter the duration after which the cache entries will expire after access. Must not be {@code null}.
*/
public CassandraMetrics(final DistributedJmxProxyFactory jmxProxyFactory, final Duration refreshAfter,
final Duration expireAfter)
{
Expand Down Expand Up @@ -87,6 +100,7 @@ final void refreshCache(final UUID nodeID, final TableReference tableReference)

/**
* Return max repaired at for a table.
* @param nodeID the node ID
* @param tableReference The table
* @return Timestamp or 0 if not available.
*/
Expand All @@ -107,6 +121,7 @@ public long getMaxRepairedAt(final UUID nodeID, final TableReference tableRefere

/**
* Return percent repaired for a table.
* @param nodeID the node ID
* @param tableReference The table
* @return Percent repaired or 0 if not available.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public class DefaultRepairConfigurationProvider extends NodeStateListenerBase im
private Function<TableReference, Set<RepairConfiguration>> myRepairConfigurationFunction;
private TableReferenceFactory myTableReferenceFactory;

/**
* Default constructor.
*/
public DefaultRepairConfigurationProvider()
{
//NOOP
Expand Down Expand Up @@ -257,6 +260,10 @@ private boolean isTableIgnored(final TableMetadata table, final boolean ignore)
&& "org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy".equals(compaction.get("class"));
}

/**
* Create Builder for DefaultRepairConfigurationProvider.
* @return Builder the Builder instance for the class.
*/
public static Builder newBuilder()
{
return new Builder();
Expand Down Expand Up @@ -459,6 +466,9 @@ private void setupConfiguration()
}
}

/**
* Builder for DefaultRepairConfigurationProvider.
*/
public static class Builder
{
private CqlSession mySession;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Class used to construct repair groups.
*/
public class RepairGroup extends ScheduledTask
{
private static final Logger LOG = LoggerFactory.getLogger(RepairGroup.class);
Expand All @@ -43,6 +46,12 @@ public class RepairGroup extends ScheduledTask
private final TableRepairMetrics myTableRepairMetrics;
private final List<TableRepairPolicy> myRepairPolicies;

/**
* Constructs an IncrementalRepairTask for a specific node and table.
*
* @param priority the priority for job creation.
* @param builder the Builder to construct RepairGroup.
*/
public RepairGroup(final int priority, final Builder builder)
{
super(priority);
Expand Down Expand Up @@ -123,7 +132,8 @@ public String toString()
/**
* Get repair tasks.
*
* @return Collection<RepairTask>
* @param nodeID the Node id.
* @return a Collection of RepairTask
*/
@VisibleForTesting
public Collection<RepairTask> getRepairTasks(final UUID nodeID)
Expand All @@ -139,11 +149,19 @@ public Collection<RepairTask> getRepairTasks(final UUID nodeID)
return tasks;
}

/**
* Create instance of Builder to construct RepairGroup.
*
* @return Builder
*/
public static Builder newBuilder()
{
return new Builder();
}

/**
* Builder used to construct RepairGroup.
*/
public static class Builder
{
private TableReference myTableReference;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Abstract Class used to represent repair tasks.
*/
public abstract class RepairTask implements NotificationListener
{
private static final Logger LOG = LoggerFactory.getLogger(RepairTask.class);
Expand All @@ -65,6 +68,15 @@ public abstract class RepairTask implements NotificationListener
private volatile Set<LongTokenRange> myFailedRanges = new HashSet<>();
private volatile Set<LongTokenRange> mySuccessfulRanges = new HashSet<>();

/**
* Constructs a RepairTask for the specified node and table with the given repair configuration and metrics.
*
* @param currentNodeID the UUID of the current node where the repair task is running. Must not be {@code null}.
* @param jmxProxyFactory the factory to create connections to distributed JMX proxies. Must not be {@code null}.
* @param tableReference the reference to the table that is being repaired. Must not be {@code null}.
* @param repairConfiguration the configuration specifying how the repair task should be executed. Must not be {@code null}.
* @param tableRepairMetrics the metrics associated with table repairs for monitoring and tracking purposes. May be {@code null}.
*/
protected RepairTask(
final UUID currentNodeID,
final DistributedJmxProxyFactory jmxProxyFactory,
Expand Down Expand Up @@ -333,6 +345,9 @@ protected void onRangeFinished(final LongTokenRange range, final RepairStatus re
}
}

/**
* Enum used to provide Event Progress for RepairTask.
*/
public enum ProgressEventType
{
/**
Expand Down Expand Up @@ -413,7 +428,13 @@ public void run()
}
}


/**
* Returns the set of token ranges that have failed during the repair task.
*
* <p>This method is primarily intended for testing purposes.</p>
*
* @return a set of {@link LongTokenRange} representing the failed token ranges.
*/
@VisibleForTesting
protected final Set<LongTokenRange> getFailedRanges()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@

import java.util.concurrent.TimeUnit;

/**
* Class used to run Incremental Repairs in Cassandra.
*/
public class IncrementalRepairJob extends ScheduledRepairJob
{
private static final Logger LOG = LoggerFactory.getLogger(IncrementalRepairJob.class);
Expand Down Expand Up @@ -185,6 +188,9 @@ public final int hashCode()
return Objects.hash(super.hashCode(), myReplicationState, myCassandraMetrics, myNode);
}

/**
* Builder class to construct IncrementalRepairJob.
*/
@SuppressWarnings("VisibilityModifier")
public static class Builder
{
Expand Down
Loading

0 comments on commit 86220d3

Please sign in to comment.