diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/ConnectionConfig.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/ConnectionConfig.java index 7ad8de2e1..e1fac2dbc 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/ConnectionConfig.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/ConnectionConfig.java @@ -21,19 +21,19 @@ public class ConnectionConfig private DistributedNativeConnection myCqlConnection = new DistributedNativeConnection(); private DistributedJmxConnection myJmxConnection = new DistributedJmxConnection(); - @JsonProperty("cql") + @JsonProperty ("cql") public final DistributedNativeConnection getCqlConnection() { return myCqlConnection; } - @JsonProperty("jmx") + @JsonProperty ("jmx") public final DistributedJmxConnection getJmxConnection() { return myJmxConnection; } - @JsonProperty("cql") + @JsonProperty ("cql") public final void setCqlConnection(final DistributedNativeConnection cqlConnection) { if (cqlConnection != null) @@ -42,7 +42,7 @@ public final void setCqlConnection(final DistributedNativeConnection cqlConnecti } } - @JsonProperty("jmx") + @JsonProperty ("jmx") public final void setJmxConnection(final DistributedJmxConnection jmxConnection) { if (jmxConnection != null) diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/DistributedJmxConnection.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/DistributedJmxConnection.java index 1e4548791..4ec831a48 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/DistributedJmxConnection.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/DistributedJmxConnection.java @@ -19,10 +19,14 @@ import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync; +import com.fasterxml.jackson.annotation.JsonProperty; + import java.util.function.Supplier; public class DistributedJmxConnection extends Connection { + private RetryPolicyConfig myRetryPolicyConfig = new RetryPolicyConfig(); + public DistributedJmxConnection() { try @@ -35,6 +39,18 @@ public DistributedJmxConnection() } } + @JsonProperty ("retryPolicy") + public final RetryPolicyConfig getRetryPolicyConfig() + { + return myRetryPolicyConfig; + } + + @JsonProperty ("retryPolicy") + public final void setRetryPolicyConfig(final RetryPolicyConfig retryPolicyConfig) + { + myRetryPolicyConfig = retryPolicyConfig; + } + /** * @return */ @@ -42,9 +58,9 @@ public DistributedJmxConnection() protected Class[] expectedConstructor() { return new Class[] { - Supplier.class, - DistributedNativeConnectionProvider.class, - EccNodesSync.class + Supplier.class, + DistributedNativeConnectionProvider.class, + EccNodesSync.class }; } } diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/RetryPolicyConfig.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/RetryPolicyConfig.java new file mode 100644 index 000000000..e4aa59012 --- /dev/null +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/RetryPolicyConfig.java @@ -0,0 +1,157 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.application.config.connection; + +import java.util.Locale; +import java.util.concurrent.TimeUnit; +import com.fasterxml.jackson.annotation.JsonProperty; + +public final class RetryPolicyConfig +{ + + private static final int DEFAULT_MAX_ATTEMPTS = 5; + private static final long DEFAULT_DELAY = 5000; + private static final long DEFAULT_MAX_DELAY = 30000; + private static final long DEFAULT_INITIAL_DELAY = 86400; + private static final long DEFAULT_FIXED_DELAY = 86400; + + @JsonProperty ("maxAttempts") + private Integer myMaxAttempts = DEFAULT_MAX_ATTEMPTS; + @JsonProperty ("delay") + private long myDelay = DEFAULT_DELAY; + @JsonProperty ("maxDelay") + private long myMaxDelay = DEFAULT_MAX_DELAY; + @JsonProperty ("unit") + private String myUnit = "seconds"; + @JsonProperty ("initialDelay") + private long myInitialDelay = DEFAULT_INITIAL_DELAY; + @JsonProperty ("fixedDelay") + private long myFixedDelay = DEFAULT_FIXED_DELAY; + + public RetryPolicyConfig() + { + } + + public RetryPolicyConfig(final Integer maxAttempts, + final Integer delay, + final Integer maxDelay, + final String unit, + final long initialDelay, + final long fixedDelay) + { + this.myMaxAttempts = maxAttempts; + this.myDelay = convertToMillis(delay, unit); + this.myMaxDelay = convertToMillis(maxDelay, unit); + this.myUnit = unit; + this.myInitialDelay = initialDelay; + this.myFixedDelay = fixedDelay; + } + + @JsonProperty ("maxAttempts") + public Integer getMaxAttempts() + { + return myMaxAttempts; + } + + @JsonProperty ("maxAttempts") + public void setMaxAttempts(final Integer maxAttempts) + { + this.myMaxAttempts = maxAttempts; + } + + @JsonProperty ("delay") + public long getDelay() + { + return myDelay; + } + + @JsonProperty ("delay") + public void setDelay(final Integer delay) + { + this.myDelay = convertToMillis(delay, myUnit); + } + + @JsonProperty ("maxDelay") + public long getMaxDelay() + { + return myMaxDelay; + } + + @JsonProperty ("maxDelay") + public void setMaxDelay(final Integer maxDelay) + { + this.myMaxDelay = convertToMillis(maxDelay, myUnit); + } + + @JsonProperty ("unit") + public String getUnit() + { + return myUnit; + } + + @JsonProperty ("unit") + public void setUnit(final String unit) + { + this.myUnit = unit; + // Recalculate delays with the new unit + this.myDelay = convertToMillis((int) TimeUnit.MILLISECONDS.toSeconds(this.myDelay), unit); + this.myMaxDelay = convertToMillis((int) TimeUnit.MILLISECONDS.toSeconds(this.myMaxDelay), unit); + } + + @JsonProperty ("initialDelay") + public long getInitialDelay() + { + return myInitialDelay; + } + + @JsonProperty ("initialDelay") + public void setInitialDelay(final Integer initialDelay) + { + this.myInitialDelay = convertToMillis(initialDelay, myUnit); + } + + @JsonProperty ("fixedDelay") + public long getFixedDelay() + { + return myFixedDelay; + } + + @JsonProperty ("fixedDelay") + public void setFixedDelay(final Integer fixedDelay) + { + this.myFixedDelay = convertToMillis(fixedDelay, myUnit); + } + + private long convertToMillis(final Integer value, final String unit) + { + return switch (unit.toLowerCase(Locale.ENGLISH)) + { + case "milliseconds" -> value; + case "seconds" -> TimeUnit.SECONDS.toMillis(value); + case "minutes" -> TimeUnit.MINUTES.toMillis(value); + default -> throw new IllegalArgumentException("Unsupported time unit: " + unit); + }; + } + + public long currentDelay(final Integer count) + { + long currentDelay = myDelay * count; + if (currentDelay > myMaxDelay) + { + currentDelay = myMaxDelay; + } + return currentDelay; + } +} diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/BeanConfigurator.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/BeanConfigurator.java index 57e449bfb..d0a8ef5d5 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/BeanConfigurator.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/BeanConfigurator.java @@ -208,6 +208,15 @@ public DistributedJmxConnectionProvider distributedJmxConnectionProvider( jmxSecurity::get, distributedNativeConnectionProvider, eccNodesSync); } + @Bean + public RetrySchedulerService retrySchedulerService(final Config config, + final DistributedJmxConnectionProvider jmxConnectionProvider, + final EccNodesSync eccNodesSync, + final DistributedNativeConnectionProvider nativeConnectionProvider) + { + return new RetrySchedulerService(eccNodesSync, config, jmxConnectionProvider, nativeConnectionProvider); + } + private Security getSecurityConfig() throws ConfigurationException { return ConfigurationHelper.DEFAULT_INSTANCE.getConfiguration(SECURITY_FILE, Security.class); diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetryBackoffStrategy.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetryBackoffStrategy.java new file mode 100644 index 000000000..ba3427acb --- /dev/null +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetryBackoffStrategy.java @@ -0,0 +1,67 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.application.spring; + +import com.ericsson.bss.cassandra.ecchronos.application.config.connection.RetryPolicyConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class RetryBackoffStrategy +{ + private static final Logger LOG = LoggerFactory.getLogger(RetryBackoffStrategy.class); + private static final int MILLISECONDS_1000 = 1000; + private final RetryPolicyConfig myRetryPolicyConfig; + + public RetryBackoffStrategy(final RetryPolicyConfig retryPolicyConfig) + { + this.myRetryPolicyConfig = retryPolicyConfig; + } + + public long getInitialDelay() + { + return myRetryPolicyConfig.getInitialDelay(); + } + + public long getFixedDelay() + { + return myRetryPolicyConfig.getFixedDelay(); + } + + public int getMaxAttempts() + { + return myRetryPolicyConfig.getMaxAttempts(); + } + + public long calculateDelay(final int attempt) + { + long baseDelay = myRetryPolicyConfig.getDelay(); + long calculatedDelay = baseDelay * (attempt * 2L); + LOG.debug("Calculated delay for attempt {}: {} ms", attempt, calculatedDelay); + return Math.min(calculatedDelay, myRetryPolicyConfig.getMaxDelay() * MILLISECONDS_1000); + } + + public void sleepBeforeNextRetry(final long delayMillis) + { + try + { + Thread.sleep(delayMillis); + } + catch (InterruptedException e) + { + LOG.error("Interrupted during sleep between retry attempts", e); + Thread.currentThread().interrupt(); + } + } +} diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerService.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerService.java new file mode 100644 index 000000000..4fe510e9d --- /dev/null +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerService.java @@ -0,0 +1,220 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.application.spring; + +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.ericsson.bss.cassandra.ecchronos.application.config.Config; +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.data.enums.NodeStatus; +import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync; +import com.google.common.annotations.VisibleForTesting; +import jakarta.annotation.PostConstruct; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Objects; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.stereotype.Service; + +import javax.management.remote.JMXConnector; +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Service responsible for managing and scheduling retry attempts to reconnect to Cassandra nodes that have become unavailable. + *

+ * This service periodically checks the status of nodes and attempts to reconnect based on a configurable retry policy. + * It uses a scheduled executor service to perform retries at fixed intervals, with the intervals and the retry logic + * configurable via external configurations. + *

+ * + *

+ * The retry logic involves calculating the delay between attempts, which increases with each subsequent retry for a node. + * If the maximum number of retry attempts is reached, the node is marked as unreachable. + *

+ * + *

+ * This service is designed to run continuously in the background, adjusting its behavior based on the state of the + * Cassandra cluster and the provided configurations. It also ensures that resources are properly cleaned up on shutdown. + *

+ */ + +@Service +public final class RetrySchedulerService implements DisposableBean +{ + + private static final Logger LOG = LoggerFactory.getLogger(RetrySchedulerService.class); + private static final String COLUMN_NODE_ID = "node_id"; + private static final String COLUMN_NODE_STATUS = "node_status"; + private static final int SCHEDULER_AWAIT_TERMINATION = 60; + private final EccNodesSync myEccNodesSync; + private final DistributedJmxConnectionProvider myJmxConnectionProvider; + private final DistributedNativeConnectionProvider myDistributedNativeConnectionProvider; + private final RetryBackoffStrategy retryBackoffStrategy; + private final ScheduledExecutorService myScheduler = Executors.newScheduledThreadPool(1); + + public RetrySchedulerService(final EccNodesSync eccNodesSync, + final Config config, + final DistributedJmxConnectionProvider jmxConnectionProvider, + final DistributedNativeConnectionProvider distributedNativeConnectionProvider) + { + this.myEccNodesSync = eccNodesSync; + this.myJmxConnectionProvider = jmxConnectionProvider; + this.myDistributedNativeConnectionProvider = distributedNativeConnectionProvider; + this.retryBackoffStrategy = new RetryBackoffStrategy(config.getConnectionConfig().getJmxConnection().getRetryPolicyConfig()); + } + + @PostConstruct + public void startScheduler() + { + long initialDelay = retryBackoffStrategy.getInitialDelay(); + long fixedDelay = retryBackoffStrategy.getFixedDelay(); + + LOG.info("Starting RetrySchedulerService with initialDelay={} ms and fixedDelay={} ms", initialDelay, fixedDelay); + + myScheduler.scheduleWithFixedDelay(this::retryNodes, initialDelay, fixedDelay, TimeUnit.MILLISECONDS); + } + + @VisibleForTesting + void retryNodes() + { + LOG.debug("Retrying unavailable nodes"); + List unavailableNodes = findUnavailableNodes(); + + if (unavailableNodes.isEmpty()) + { + LOG.info("No unavailable nodes found."); + return; + } + + unavailableNodes.forEach(this::retryConnectionForNode); + } + + private List findUnavailableNodes() + { + List unavailableNodes = new ArrayList<>(); + ResultSet resultSet = myEccNodesSync.getResultSet(); + + for (Row row : resultSet) + { + UUID nodeId = row.getUuid(COLUMN_NODE_ID); + String status = Objects.requireNonNull(row.getString(COLUMN_NODE_STATUS)).toUpperCase(Locale.ENGLISH); + + if (NodeStatus.UNAVAILABLE.name().equals(status)) + { + myDistributedNativeConnectionProvider.getNodes() + .stream() + .filter(node -> Objects.equals(node.getHostId(), nodeId)) + .findFirst() + .ifPresent(unavailableNodes::add); + } + } + + return unavailableNodes; + } + + private void retryConnectionForNode(final Node node) + { + UUID nodeId = node.getHostId(); + for (int attempt = 1; attempt <= retryBackoffStrategy.getMaxAttempts(); attempt++) + { + if (tryReconnectToNode(node, nodeId, attempt)) + { + return; // Successfully reconnected, exit method + } + } + markNodeUnreachable(node, nodeId); + } + + private boolean tryReconnectToNode(final Node node, final UUID nodeId, final int attempt) + { + long delayMillis = retryBackoffStrategy.calculateDelay(attempt); + LOG.info("Attempting to reconnect to node: {}, attempt: {}", nodeId, attempt); + + if (establishConnectionToNode(node)) + { + LOG.info("Successfully reconnected to node: {}", nodeId); + myEccNodesSync.updateNodeStatus(NodeStatus.AVAILABLE, node.getDatacenter(), nodeId); + return true; + } + else + { + LOG.warn("Failed to reconnect to node: {}, next retry in {} ms", nodeId, delayMillis); + retryBackoffStrategy.sleepBeforeNextRetry(delayMillis); + return false; + } + } + + private void markNodeUnreachable(final Node node, final UUID nodeId) + { + LOG.error("Max retry attempts reached for node: {}. Marking as UNREACHABLE.", nodeId); + myEccNodesSync.updateNodeStatus(NodeStatus.UNREACHABLE, node.getDatacenter(), nodeId); + } + + private boolean establishConnectionToNode(final Node node) + { + UUID nodeId = node.getHostId(); + JMXConnector jmxConnector = myJmxConnectionProvider.getJmxConnector(nodeId); + boolean isConnected = jmxConnector != null && isConnected(jmxConnector); + + if (isConnected) + { + myJmxConnectionProvider.getJmxConnections().put(nodeId, jmxConnector); + LOG.info("Node {} connected successfully.", nodeId); + } + else + { + LOG.warn("Failed to connect to node {}.", nodeId); + } + + return isConnected; + } + + private boolean isConnected(final JMXConnector jmxConnector) + { + try + { + jmxConnector.getConnectionId(); + return true; + } + catch (IOException e) + { + LOG.error("Error while checking connection for JMX connector", e); + return false; + } + } + + @Override + public void destroy() + { + LOG.info("Shutting down RetrySchedulerService..."); + RetryServiceShutdownManager.shutdownExecutorService(myScheduler, SCHEDULER_AWAIT_TERMINATION, TimeUnit.SECONDS); + LOG.info("RetrySchedulerService shut down complete."); + } + + @VisibleForTesting + ScheduledExecutorService getMyScheduler() + { + return myScheduler; + } +} diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetryServiceShutdownManager.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetryServiceShutdownManager.java new file mode 100644 index 000000000..9187b4ca5 --- /dev/null +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetryServiceShutdownManager.java @@ -0,0 +1,55 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.application.spring; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class RetryServiceShutdownManager +{ + private static final Logger LOG = LoggerFactory.getLogger(RetryServiceShutdownManager.class); + + private RetryServiceShutdownManager() + { + + } + + public static void shutdownExecutorService(final ScheduledExecutorService executorService, + final int timeout, + final TimeUnit unit) + { + executorService.shutdown(); + try + { + if (!executorService.awaitTermination(timeout, unit)) + { + LOG.warn("Executor did not terminate in time, forcing shutdown..."); + executorService.shutdownNow(); + if (!executorService.awaitTermination(timeout, unit)) + { + LOG.error("Executor did not terminate after force shutdown."); + } + } + } + catch (InterruptedException e) + { + LOG.error("Interrupted during shutdown. Forcing shutdown now...", e); + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } +} diff --git a/application/src/main/resources/ecc.yml b/application/src/main/resources/ecc.yml index 34a0f9701..378041bd9 100644 --- a/application/src/main/resources/ecc.yml +++ b/application/src/main/resources/ecc.yml @@ -93,6 +93,26 @@ connection: ## The default provider will be used unless another is specified. ## provider: com.ericsson.bss.cassandra.ecchronos.application.providers.AgentJmxConnectionProvider + retryPolicy: + ## Max number of attempts ecChronos will try to connect with Cassandra. + maxAttempts: 5 + ## Delay use to wait between an attempt and another, this value will be multiplied by the current attempt count powered by two. + ## If the current attempt is 4 and the default delay is 5 seconds, so ((4(attempt) x 2) x 5(default delay)) = 40 seconds. + ## If the calculated delay is greater than maxDelay, maxDelay will be used instead of the calculated delay. + delay: 5 + ## Maximum delay before the next connection attempt is made. + ## Setting it as 0 will disable maxDelay and the delay interval will + ## be calculated based on the attempt count and the default delay. + maxDelay: 30 + unit: seconds + # Initial delay before the RetrySchedulerService starts after the application launches. + # This is in seconds and represents the wait time before the service first runs + initialDelay: 86400 + # Fixed delay between subsequent executions of the RetrySchedulerService. + # This value represents the delay time (in seconds) between when the service completes one execution + # and the time it will be triggered again for another execution. + # During each execution, the service will check for unavailable nodes and attempt reconnections as per the retry logic. + fixedDelay: 86400 rest_server: ## diff --git a/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java index f299ebe48..dfc469d18 100644 --- a/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java +++ b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java @@ -14,10 +14,9 @@ */ package com.ericsson.bss.cassandra.ecchronos.application.config; -import com.ericsson.bss.cassandra.ecchronos.application.config.connection.AgentConnectionConfig; -import com.ericsson.bss.cassandra.ecchronos.application.config.connection.ConnectionConfig; -import com.ericsson.bss.cassandra.ecchronos.application.config.connection.DistributedNativeConnection; +import com.ericsson.bss.cassandra.ecchronos.application.config.connection.*; import com.ericsson.bss.cassandra.ecchronos.application.exceptions.ConfigurationException; +import com.ericsson.bss.cassandra.ecchronos.application.providers.AgentJmxConnectionProvider; import com.ericsson.bss.cassandra.ecchronos.application.providers.AgentNativeConnectionProvider; import com.ericsson.bss.cassandra.ecchronos.connection.DataCenterAwarePolicy; import com.fasterxml.jackson.core.exc.StreamReadException; @@ -32,6 +31,7 @@ import java.io.IOException; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; public class TestConfig @@ -39,6 +39,7 @@ public class TestConfig private static final String DEFAULT_AGENT_FILE_NAME = "all_set.yml"; private static Config config; private static DistributedNativeConnection nativeConnection; + private static DistributedJmxConnection distributedJmxConnection; @Before public void setup() throws StreamReadException, DatabindException, IOException @@ -54,6 +55,7 @@ public void setup() throws StreamReadException, DatabindException, IOException ConnectionConfig connection = config.getConnectionConfig(); nativeConnection = connection.getCqlConnection(); + distributedJmxConnection = connection.getJmxConnection(); } @Test @@ -85,7 +87,8 @@ public void testDefaultDatacenterAware() .getAgentConnectionConfig() .getDatacenterAware() .getDatacenters() - .get("datacenter1").getName()).isEqualTo("datacenter1"); + .get("datacenter1") + .getName()).isEqualTo("datacenter1"); } @Test @@ -95,9 +98,9 @@ public void testDefaultRackAware() assertThat(nativeConnection .getAgentConnectionConfig() .getRackAware() - .getRacks().get("rack1") - .getDatacenterName() - ).isEqualTo("datacenter1"); + .getRacks() + .get("rack1") + .getDatacenterName()).isEqualTo("datacenter1"); } @Test @@ -106,26 +109,34 @@ public void testDefaultHostAware() assertThat(nativeConnection.getAgentConnectionConfig().getHostAware()).isNotNull(); assertThat(nativeConnection .getAgentConnectionConfig() - .getHostAware().getHosts() - .get("127.0.0.1").getPort()) + .getHostAware() + .getHosts() + .get("127.0.0.1") + .getPort()) .isEqualTo(9042); assertThat(nativeConnection .getAgentConnectionConfig() - .getHostAware().getHosts() - .get("127.0.0.2").getPort()) + .getHostAware() + .getHosts() + .get("127.0.0.2") + .getPort()) .isEqualTo(9042); assertThat(nativeConnection .getAgentConnectionConfig() - .getHostAware().getHosts() - .get("127.0.0.3").getPort()) + .getHostAware() + .getHosts() + .get("127.0.0.3") + .getPort()) .isEqualTo(9042); assertThat(nativeConnection .getAgentConnectionConfig() - .getHostAware().getHosts() - .get("127.0.0.4").getPort()) + .getHostAware() + .getHosts() + .get("127.0.0.4") + .getPort()) .isEqualTo(9042); } @@ -139,7 +150,8 @@ public void testAgentProviderConfig() @Test public void testConfigurationExceptionForWrongAgentType() { - assertThrows(ConfigurationException.class, () -> { + assertThrows(ConfigurationException.class, () -> + { nativeConnection.getAgentConnectionConfig().setType("wrongType"); }); } @@ -156,4 +168,19 @@ public void testDefaultLoadBalancingPolicy() { assertThat(nativeConnection.getAgentConnectionConfig().getDatacenterAwarePolicy()).isEqualTo(DataCenterAwarePolicy.class); } -} \ No newline at end of file + + @Test + public void testRetryPolicyConfig() + { + Class providerClass = distributedJmxConnection.getProviderClass(); + assertThat(providerClass).isEqualTo(AgentJmxConnectionProvider.class); + RetryPolicyConfig retryPolicyConfig = distributedJmxConnection.getRetryPolicyConfig(); + assertNotNull(retryPolicyConfig); + assertThat(5).isEqualTo(retryPolicyConfig.getMaxAttempts()); + assertThat(5000).isEqualTo(retryPolicyConfig.getDelay()); + assertThat(30000).isEqualTo(retryPolicyConfig.getMaxDelay()); + assertThat("seconds").isEqualTo(retryPolicyConfig.getUnit()); + assertThat(86400000).isEqualTo(retryPolicyConfig.getInitialDelay()); + assertThat(86400000).isEqualTo(retryPolicyConfig.getFixedDelay()); + } +} diff --git a/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerServiceTest.java b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerServiceTest.java new file mode 100644 index 000000000..d72e06029 --- /dev/null +++ b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerServiceTest.java @@ -0,0 +1,205 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.application.spring; + +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.ericsson.bss.cassandra.ecchronos.application.config.Config; +import com.ericsson.bss.cassandra.ecchronos.application.config.connection.ConnectionConfig; +import com.ericsson.bss.cassandra.ecchronos.application.config.connection.DistributedJmxConnection; +import com.ericsson.bss.cassandra.ecchronos.application.config.connection.RetryPolicyConfig; +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.data.enums.NodeStatus; +import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import javax.management.remote.JMXConnector; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +class RetrySchedulerServiceTest +{ + @Mock + private EccNodesSync eccNodesSync; + @Mock + private Config config; + @Mock + private DistributedJmxConnectionProvider jmxConnectionProvider; + @Mock + private DistributedNativeConnectionProvider nativeConnectionProvider; + @Mock + private RetryPolicyConfig retryPolicyConfig; + private RetrySchedulerService retrySchedulerService; + + @BeforeEach + void setUp() + { + MockitoAnnotations.openMocks(this); + + // Mock nested method calls for Config + ConnectionConfig connectionConfig = mock(ConnectionConfig.class); + DistributedJmxConnection jmxConnection = mock(DistributedJmxConnection.class); + + when(config.getConnectionConfig()).thenReturn(connectionConfig); + when(connectionConfig.getJmxConnection()).thenReturn(jmxConnection); + when(jmxConnection.getRetryPolicyConfig()).thenReturn(retryPolicyConfig); + + // Mock behavior for retryPolicyConfig + when(retryPolicyConfig.getInitialDelay()).thenReturn(1000L); + when(retryPolicyConfig.getFixedDelay()).thenReturn(1000L); + when(retryPolicyConfig.getMaxAttempts()).thenReturn(3); + when(retryPolicyConfig.getDelay()).thenReturn(500L); + when(retryPolicyConfig.getMaxDelay()).thenReturn(5000L); + + // Initialize RetrySchedulerService with new RetryBackoffStrategy + retrySchedulerService = new RetrySchedulerService(eccNodesSync, config, jmxConnectionProvider, nativeConnectionProvider); + } + + @Test + void testSchedulerStart() + { + // Call the method under test + retrySchedulerService.startScheduler(); + // Verify that the scheduler starts correctly with the correct initialDelay and fixedDelay + verify(retryPolicyConfig, times(1)).getInitialDelay(); + verify(retryPolicyConfig, times(1)).getFixedDelay(); + } + + @Test + void testRetryNodesWhenNoUnavailableNodes() + { + // Mock ResultSet and Row to simulate no unavailable nodes + ResultSet mockResultSet = mock(ResultSet.class); + when(eccNodesSync.getResultSet()).thenReturn(mockResultSet); + when(mockResultSet.iterator()).thenReturn(Collections.emptyIterator()); + + retrySchedulerService.retryNodes(); + + // Verify that no nodes are marked as UNAVAILABLE or retried + verify(eccNodesSync, never()).updateNodeStatus(any(), anyString(), any()); + verify(jmxConnectionProvider, never()).getJmxConnector(any()); + } + + @Test + void testRetryNodesWithUnavailableNodeWhenConnectionSuccessful() throws IOException + { + // Create mock objects + ResultSet mockResultSet = mock(ResultSet.class); + Row mockRow = mock(Row.class); + UUID nodeId = UUID.randomUUID(); + + // Setup mock behavior for ResultSet and Row + when(mockRow.getString("node_status")).thenReturn(NodeStatus.UNAVAILABLE.name()); + when(mockRow.getUuid("node_id")).thenReturn(nodeId); + when(mockResultSet.iterator()).thenReturn(Collections.singletonList(mockRow).iterator()); + when(eccNodesSync.getResultSet()).thenReturn(mockResultSet); + + // Mock Node and its behavior + Node mockNode = mock(Node.class); + when(mockNode.getHostId()).thenReturn(nodeId); + when(mockNode.getDatacenter()).thenReturn("datacenter"); + when(nativeConnectionProvider.getNodes()).thenReturn(Collections.singletonList(mockNode)); + + // Mock JMX connector behavior + JMXConnector mockJmxConnector = mock(JMXConnector.class); + when(jmxConnectionProvider.getJmxConnector(nodeId)).thenReturn(mockJmxConnector); + when(mockJmxConnector.getConnectionId()).thenReturn("connected"); + + // Mock the JMX connections map + ConcurrentHashMap mockJmxConnections = mock(ConcurrentHashMap.class); + when(jmxConnectionProvider.getJmxConnections()).thenReturn(mockJmxConnections); + + // Call the method under test + retrySchedulerService.retryNodes(); + + // Verify interactions + // Ensure the status update call is made with expected parameters + verify(eccNodesSync).updateNodeStatus(NodeStatus.AVAILABLE, "datacenter", nodeId); + + // Verify JMXConnector is added to the map + verify(mockJmxConnections).put(eq(nodeId), any(JMXConnector.class)); + } + + @Test + void testRetryNodesWithUnavailableNodeWhenConnectionFailed() throws IOException + { + // Mock the node as unavailable + ResultSet mockResultSet = mock(ResultSet.class); + when(eccNodesSync.getResultSet()).thenReturn(mockResultSet); + Row mockRow = mock(Row.class); + when(mockRow.getString("node_status")).thenReturn("UNAVAILABLE"); + UUID nodeId = UUID.randomUUID(); + when(mockRow.getUuid("node_id")).thenReturn(nodeId); + when(mockResultSet.iterator()).thenReturn(Collections.singletonList(mockRow).iterator()); + + // Mock the nodes list to contain the unavailable node + Node mockNode = mock(Node.class); + when(mockNode.getHostId()).thenReturn(nodeId); + when(mockNode.getDatacenter()).thenReturn("datacenter"); + when(nativeConnectionProvider.getNodes()).thenReturn(Collections.singletonList(mockNode)); + + // Mock the JMX connection behavior for a failed reconnection + when(jmxConnectionProvider.getJmxConnector(nodeId)).thenReturn(null); + + retrySchedulerService.retryNodes(); + + // Check the node status update to UNREACHABLE + verify(eccNodesSync).updateNodeStatus(eq(NodeStatus.UNREACHABLE), anyString(), eq(nodeId)); + } + + @Test + void testMaxRetryAttemptsReached() throws IOException + { + // Mock the node as unavailable + ResultSet mockResultSet = mock(ResultSet.class); + when(eccNodesSync.getResultSet()).thenReturn(mockResultSet); + Row mockRow = mock(Row.class); + when(mockRow.getString("node_status")).thenReturn("UNAVAILABLE"); + UUID nodeId = UUID.randomUUID(); + when(mockRow.getUuid("node_id")).thenReturn(nodeId); + when(mockResultSet.iterator()).thenReturn(Collections.singletonList(mockRow).iterator()); + + // Mock the nodes list to contain the unavailable node + Node mockNode = mock(Node.class); + when(mockNode.getDatacenter()).thenReturn("datacenter"); + when(mockNode.getHostId()).thenReturn(nodeId); + when(nativeConnectionProvider.getNodes()).thenReturn(Collections.singletonList(mockNode)); + + // Simulate failed reconnects up to max attempts + when(jmxConnectionProvider.getJmxConnector(nodeId)).thenReturn(null); + + // Call the method under test + retrySchedulerService.retryNodes(); + + // Verify that the node is marked as UNREACHABLE after reaching max retry attempts + verify(eccNodesSync).updateNodeStatus(NodeStatus.UNREACHABLE, "datacenter", nodeId); + } + + @Test + void testDestroyShutdownService() + { + retrySchedulerService.destroy(); + assertTrue(retrySchedulerService.getMyScheduler().isShutdown()); + } +} diff --git a/application/src/test/resources/all_set.yml b/application/src/test/resources/all_set.yml index e7bd0aad3..6a0aa5fbc 100644 --- a/application/src/test/resources/all_set.yml +++ b/application/src/test/resources/all_set.yml @@ -43,6 +43,14 @@ connection: port: 9042 provider: com.ericsson.bss.cassandra.ecchronos.application.providers.AgentNativeConnectionProvider jmx: + provider: com.ericsson.bss.cassandra.ecchronos.application.providers.AgentJmxConnectionProvider + retryPolicy: + maxAttempts: 5 + delay: 5 + maxDelay: 30 + unit: seconds + initialDelay: 86400 + fixedDelay: 86400 rest_server: host: 127.0.0.2 diff --git a/check_style.xml b/check_style.xml index c6cd70e7e..7ea324ee1 100644 --- a/check_style.xml +++ b/check_style.xml @@ -61,7 +61,7 @@ - + diff --git a/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/enums/NodeStatus.java b/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/enums/NodeStatus.java index 706a5998a..845b35e1e 100644 --- a/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/enums/NodeStatus.java +++ b/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/enums/NodeStatus.java @@ -20,5 +20,6 @@ public enum NodeStatus { UNAVAILABLE, - AVAILABLE + AVAILABLE, + UNREACHABLE } diff --git a/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/sync/EccNodesSync.java b/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/sync/EccNodesSync.java index 4b70b0878..389b3ed71 100644 --- a/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/sync/EccNodesSync.java +++ b/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/sync/EccNodesSync.java @@ -16,8 +16,8 @@ import com.datastax.oss.driver.api.core.ConsistencyLevel; import com.datastax.oss.driver.api.core.CqlSession; -import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.querybuilder.QueryBuilder; @@ -35,8 +35,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; /** * CQL Definition for nodes_sync table. CREATE TABLE ecchronos_agent.nodes_sync ( ecchronos_id TEXT, datacenter_name @@ -66,6 +66,7 @@ public final class EccNodesSync private final PreparedStatement myCreateStatement; private final PreparedStatement myUpdateStatusStatement; + private final PreparedStatement mySelectStatusStatement; private EccNodesSync(final Builder builder) throws UnknownHostException { @@ -91,9 +92,20 @@ private EccNodesSync(final Builder builder) throws UnknownHostException .whereColumn(COLUMN_NODE_ID).isEqualTo(bindMarker()) .build() .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)); + mySelectStatusStatement = mySession.prepare(selectFrom(KEYSPACE_NAME, TABLE_NAME) + .columns(COLUMN_NODE_ID, COLUMN_NODE_ENDPOINT, COLUMN_DC_NAME, COLUMN_NODE_STATUS) + .whereColumn(COLUMN_ECCHRONOS_ID).isEqualTo(bindMarker()) + .build()); ecChronosID = builder.myEcchronosID; } + public ResultSet getResultSet() + { + // Bind the parameters + BoundStatement boundStatement = mySelectStatusStatement.bind(ecChronosID); + return mySession.execute(boundStatement); + } + public void acquireNodes() throws EcChronosException { if (myNodesList.isEmpty()) diff --git a/data/src/test/java/com/ericsson/bss/cassandra/ecchronos/data/sync/TestEccNodesSync.java b/data/src/test/java/com/ericsson/bss/cassandra/ecchronos/data/sync/TestEccNodesSync.java index daf0ea662..865cb387d 100644 --- a/data/src/test/java/com/ericsson/bss/cassandra/ecchronos/data/sync/TestEccNodesSync.java +++ b/data/src/test/java/com/ericsson/bss/cassandra/ecchronos/data/sync/TestEccNodesSync.java @@ -145,4 +145,13 @@ public void testEccNodesWithNullSession() NullPointerException.class, tmpEccNodesSyncBuilder::build); assertEquals("Session cannot be null", exception.getMessage()); } + + @Test + public void testGetAllNodes() + { + eccNodesSync.verifyInsertNodeInfo(datacenterName, "127.0.0.1", NodeStatus.AVAILABLE.name(), Instant.now(), Instant.now().plus(30, ChronoUnit.MINUTES), UUID.randomUUID()); + eccNodesSync.verifyInsertNodeInfo(datacenterName, "127.0.0.2", NodeStatus.UNAVAILABLE.name(), Instant.now(), Instant.now().plus(30, ChronoUnit.MINUTES), UUID.randomUUID()); + ResultSet resultSet = eccNodesSync.getResultSet(); + assertNotNull(resultSet); + } }