Skip to content

Commit

Permalink
Retry Policy for Jmx Connection #700 (#710)
Browse files Browse the repository at this point in the history
- Retry Policy for jmx connection
- Read all nodes statuses from nodes_sync table
- Filter out all node with unavailable status
- Retry to attempt jmx connection for unavailable nodes
- If connection successful add connection and change
status to available in table.
- If connection failed after given retry attempts, change
status to unreachable in table.
- This is schedule service that would execute after given
fix delay. By default it would run after every 24 hour and
scan the table.

Co-authored-by: sajid riaz <sajid.riaz@ericsson.com>
  • Loading branch information
SajidRiaz138 and sajid riaz authored Sep 10, 2024
1 parent bdb23fd commit def5330
Show file tree
Hide file tree
Showing 16 changed files with 995 additions and 25 deletions.
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Version 1.0.0 (Not yet Release)

* Retry Policy for Jmx Connection - Issue #700
* Update Architecture and Tests Documentations to Add the Agent Features and The cassandra-test-image - Issue #707
* Enhance Test Infrastructure by Adding Cassandra-Test-Image Module With Multi-Datacenter Cluster and Abstract Integration Test Class - Issue #706
* Investigate Introduction of testContainers - Issue #682
Expand All @@ -10,4 +11,4 @@
* Create JMXAgentConfig to add Hosts in JMX Session Through ecc.yml - Issue #675
* Expose AgentNativeConnectionProvider on Connection and Application Module - Issue #673
* Create DatacenterAwareConfig to add Hosts in CQL Session Through ecc.yml - Issue #671
* Create Initial project Structure for Agent - Issue #695
* Create Initial project Structure for Agent - Issue #695
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@

import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.function.Supplier;

public class DistributedJmxConnection extends Connection<DistributedJmxConnectionProvider>
{
private RetryPolicyConfig myRetryPolicyConfig = new RetryPolicyConfig();

public DistributedJmxConnection()
{
try
Expand All @@ -35,16 +39,28 @@ public DistributedJmxConnection()
}
}

@JsonProperty("retryPolicy")
public final RetryPolicyConfig getRetryPolicyConfig()
{
return myRetryPolicyConfig;
}

@JsonProperty("retryPolicy")
public final void setRetryPolicyConfig(final RetryPolicyConfig retryPolicyConfig)
{
myRetryPolicyConfig = retryPolicyConfig;
}

/**
* @return
*/
@Override
protected Class<?>[] expectedConstructor()
{
return new Class<?>[] {
Supplier.class,
DistributedNativeConnectionProvider.class,
EccNodesSync.class
Supplier.class,
DistributedNativeConnectionProvider.class,
EccNodesSync.class
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.application.config.connection;

import java.util.Locale;
import java.util.concurrent.TimeUnit;
import com.fasterxml.jackson.annotation.JsonProperty;

public final class RetryPolicyConfig
{
public RetryPolicyConfig()
{
}

private static final int DEFAULT_MAX_ATTEMPTS = 5;
private static final long DEFAULT_DELAY_IN_MS = 5000;
private static final long DEFAULT_MAX_DELAY_IN_MS = 30000;
private static final long DEFAULT_INITIAL_DELAY_IN_MS = 86400000;
private static final long DEFAULT_FIXED_DELAY_IN_MS = 86400000;
private static final TimeUnit DEFAULT_TIME_UNIT_IN_SECONDS = TimeUnit.SECONDS;
private RetryPolicyConfig.RetryDelay myRetryDelay = new RetryPolicyConfig.RetryDelay();
private RetryPolicyConfig.RetrySchedule myRetrySchedule = new RetryPolicyConfig.RetrySchedule();

@JsonProperty("maxAttempts")
private Integer myMaxAttempts = DEFAULT_MAX_ATTEMPTS;

@JsonProperty ("maxAttempts")
public Integer getMaxAttempts()
{
return myMaxAttempts;
}

@JsonProperty("maxAttempts")
public void setMaxAttempts(final Integer maxAttempts)
{
if (maxAttempts != null)
{
this.myMaxAttempts = maxAttempts;
}
}

@JsonProperty("delay")
public void setRetryDelay(final RetryDelay retryDelay)
{
myRetryDelay = retryDelay;
}

@JsonProperty("delay")
public RetryDelay getRetryDelay()
{
return myRetryDelay;
}

@JsonProperty("retrySchedule")
public RetrySchedule getRetrySchedule()
{
return myRetrySchedule;
}

@JsonProperty("retrySchedule")
public void setRetrySchedule(final RetrySchedule retrySchedule)
{
myRetrySchedule = retrySchedule;
}

private static long convertToMillis(final Long value, final TimeUnit unit)
{
return unit.toMillis(value);
}

/**
* Configuration for retry delay parameter.
*/
public static final class RetryDelay
{
public RetryDelay()
{

}

@JsonProperty("start")
private long myDelay = DEFAULT_DELAY_IN_MS;

@JsonProperty("max")
private long myMaxDelay = DEFAULT_MAX_DELAY_IN_MS;

@JsonProperty("unit")
private TimeUnit myTimeUnit = DEFAULT_TIME_UNIT_IN_SECONDS;

@JsonProperty("start")
public long getStartDelay()
{
return myDelay;
}

@JsonProperty("start")
public void setStartDelay(final Long delay)
{
if (delay != null)
{
long convertedDelay = convertToMillis(delay, myTimeUnit);
if (convertedDelay > myMaxDelay)
{
throw new IllegalArgumentException("Start delay cannot be greater than max delay.");
}
this.myDelay = convertToMillis(delay, myTimeUnit);
}
}

@JsonProperty("max")
public long getMaxDelay()
{
return myMaxDelay;
}

@JsonProperty("max")
public void setMaxDelay(final Long maxDelay)
{
if (maxDelay != null)
{
long convertedMaxDelay = convertToMillis(maxDelay, myTimeUnit);
if (convertedMaxDelay < myDelay)
{
throw new IllegalArgumentException("Max delay cannot be less than start delay.");
}
this.myMaxDelay = convertToMillis(maxDelay, myTimeUnit);
}
}

@JsonProperty("unit")
public TimeUnit getUnit()
{
return myTimeUnit;
}

@JsonProperty("unit")
public void setTimeUnit(final String unit)
{
if (unit != null && !unit.isBlank())
{
myTimeUnit = TimeUnit.valueOf(unit.toUpperCase(Locale.US));
}
}
}

public static final class RetrySchedule
{
public RetrySchedule()
{

}

@JsonProperty("initialDelay")
private long myInitialDelay = DEFAULT_INITIAL_DELAY_IN_MS;

@JsonProperty("fixedDelay")
private long myFixedDelay = DEFAULT_FIXED_DELAY_IN_MS;

@JsonProperty("unit")
private TimeUnit myTimeUnit = DEFAULT_TIME_UNIT_IN_SECONDS;

@JsonProperty("initialDelay")
public long getInitialDelay()
{
return myInitialDelay;
}

@JsonProperty("initialDelay")
public void setInitialDelay(final Long initialDelay)
{
if (initialDelay != null)
{
this.myInitialDelay = convertToMillis(initialDelay, myTimeUnit);
}
}

@JsonProperty("fixedDelay")
public long getFixedDelay()
{
return myFixedDelay;
}

@JsonProperty("fixedDelay")
public void setFixedDelay(final Long fixedDelay)
{
if (fixedDelay != null)
{
this.myFixedDelay = convertToMillis(fixedDelay, myTimeUnit);
}
}

@JsonProperty("unit")
public TimeUnit getUnit()
{
return myTimeUnit;
}

@JsonProperty("unit")
public void setTimeUnit(final String unit)
{
if (unit != null && !unit.isBlank())
{
myTimeUnit = TimeUnit.valueOf(unit.toUpperCase(Locale.US));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,15 @@ public DistributedJmxConnectionProvider distributedJmxConnectionProvider(
jmxSecurity::get, distributedNativeConnectionProvider, eccNodesSync);
}

@Bean
public RetrySchedulerService retrySchedulerService(final Config config,
final DistributedJmxConnectionProvider jmxConnectionProvider,
final EccNodesSync eccNodesSync,
final DistributedNativeConnectionProvider nativeConnectionProvider)
{
return new RetrySchedulerService(eccNodesSync, config, jmxConnectionProvider, nativeConnectionProvider);
}

private Security getSecurityConfig() throws ConfigurationException
{
return ConfigurationHelper.DEFAULT_INSTANCE.getConfiguration(SECURITY_FILE, Security.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.application.spring;

import com.ericsson.bss.cassandra.ecchronos.application.config.connection.RetryPolicyConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RetryBackoffStrategy
{
private static final Logger LOG = LoggerFactory.getLogger(RetryBackoffStrategy.class);
private static final int ONE_SECOND_IN_MS = 1000;
private static final int RETRY_DELAY_MULTIPLIER = 2;
private final RetryPolicyConfig myRetryPolicyConfig;
private final RetryPolicyConfig.RetrySchedule myRetrySchedule;
private final RetryPolicyConfig.RetryDelay myRetryDelay;

public RetryBackoffStrategy(final RetryPolicyConfig retryPolicyConfig)
{
myRetryPolicyConfig = retryPolicyConfig;
myRetrySchedule = retryPolicyConfig.getRetrySchedule();
myRetryDelay = retryPolicyConfig.getRetryDelay();
}

public long getInitialDelay()
{
return myRetrySchedule.getInitialDelay();
}

public long getFixedDelay()
{
return myRetrySchedule.getFixedDelay();
}

public int getMaxAttempts()
{
return myRetryPolicyConfig.getMaxAttempts();
}

public long calculateDelay(final int attempt)
{
long baseDelay = myRetryDelay.getStartDelay();
long calculatedDelay = baseDelay * ((long) attempt * RETRY_DELAY_MULTIPLIER);
LOG.debug("Calculated delay for attempt {}: {} ms", attempt, calculatedDelay);
return Math.min(calculatedDelay, myRetryDelay.getMaxDelay() * ONE_SECOND_IN_MS);
}

public void sleepBeforeNextRetry(final long delayMillis)
{
try
{
Thread.sleep(delayMillis);
}
catch (InterruptedException e)
{
LOG.error("Interrupted during sleep between retry attempts", e);
Thread.currentThread().interrupt();
}
}
}
Loading

0 comments on commit def5330

Please sign in to comment.