From def53302aae92b23d516063356bd220573fefc1d Mon Sep 17 00:00:00 2001 From: Sajid <130126748+SajidRiaz138@users.noreply.github.com> Date: Tue, 10 Sep 2024 09:32:44 +0200 Subject: [PATCH] Retry Policy for Jmx Connection #700 (#710) - Retry Policy for jmx connection - Read all nodes statuses from nodes_sync table - Filter out all node with unavailable status - Retry to attempt jmx connection for unavailable nodes - If connection successful add connection and change status to available in table. - If connection failed after given retry attempts, change status to unreachable in table. - This is schedule service that would execute after given fix delay. By default it would run after every 24 hour and scan the table. Co-authored-by: sajid riaz --- CHANGES.md | 3 +- .../connection/DistributedJmxConnection.java | 22 +- .../config/connection/RetryPolicyConfig.java | 219 ++++++++++++++++++ .../application/spring/BeanConfigurator.java | 9 + .../spring/RetryBackoffStrategy.java | 72 ++++++ .../spring/RetrySchedulerService.java | 219 ++++++++++++++++++ .../spring/RetryServiceShutdownManager.java | 55 +++++ application/src/main/resources/ecc.yml | 26 +++ .../application/config/TestConfig.java | 114 +++++++-- .../spring/RetrySchedulerServiceTest.java | 211 +++++++++++++++++ application/src/test/resources/all_set.yml | 11 + .../src/test/resources/nothing_set.yml | 29 +++ check_style.xml | 2 +- .../ecchronos/data/enums/NodeStatus.java | 3 +- .../ecchronos/data/sync/EccNodesSync.java | 16 +- .../ecchronos/data/sync/TestEccNodesSync.java | 9 + 16 files changed, 995 insertions(+), 25 deletions(-) create mode 100644 application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/RetryPolicyConfig.java create mode 100644 application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetryBackoffStrategy.java create mode 100644 application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerService.java create mode 100644 application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetryServiceShutdownManager.java create mode 100644 application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerServiceTest.java create mode 100644 application/src/test/resources/nothing_set.yml diff --git a/CHANGES.md b/CHANGES.md index 7bf2577e6..16a8b9e40 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,7 @@ ## Version 1.0.0 (Not yet Release) +* Retry Policy for Jmx Connection - Issue #700 * Update Architecture and Tests Documentations to Add the Agent Features and The cassandra-test-image - Issue #707 * Enhance Test Infrastructure by Adding Cassandra-Test-Image Module With Multi-Datacenter Cluster and Abstract Integration Test Class - Issue #706 * Investigate Introduction of testContainers - Issue #682 @@ -10,4 +11,4 @@ * Create JMXAgentConfig to add Hosts in JMX Session Through ecc.yml - Issue #675 * Expose AgentNativeConnectionProvider on Connection and Application Module - Issue #673 * Create DatacenterAwareConfig to add Hosts in CQL Session Through ecc.yml - Issue #671 -* Create Initial project Structure for Agent - Issue #695 +* Create Initial project Structure for Agent - Issue #695 diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/DistributedJmxConnection.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/DistributedJmxConnection.java index 1e4548791..86b8ee918 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/DistributedJmxConnection.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/DistributedJmxConnection.java @@ -19,10 +19,14 @@ import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync; +import com.fasterxml.jackson.annotation.JsonProperty; + import java.util.function.Supplier; public class DistributedJmxConnection extends Connection { + private RetryPolicyConfig myRetryPolicyConfig = new RetryPolicyConfig(); + public DistributedJmxConnection() { try @@ -35,6 +39,18 @@ public DistributedJmxConnection() } } + @JsonProperty("retryPolicy") + public final RetryPolicyConfig getRetryPolicyConfig() + { + return myRetryPolicyConfig; + } + + @JsonProperty("retryPolicy") + public final void setRetryPolicyConfig(final RetryPolicyConfig retryPolicyConfig) + { + myRetryPolicyConfig = retryPolicyConfig; + } + /** * @return */ @@ -42,9 +58,9 @@ public DistributedJmxConnection() protected Class[] expectedConstructor() { return new Class[] { - Supplier.class, - DistributedNativeConnectionProvider.class, - EccNodesSync.class + Supplier.class, + DistributedNativeConnectionProvider.class, + EccNodesSync.class }; } } diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/RetryPolicyConfig.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/RetryPolicyConfig.java new file mode 100644 index 000000000..db8fbd584 --- /dev/null +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/RetryPolicyConfig.java @@ -0,0 +1,219 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.application.config.connection; + +import java.util.Locale; +import java.util.concurrent.TimeUnit; +import com.fasterxml.jackson.annotation.JsonProperty; + +public final class RetryPolicyConfig +{ + public RetryPolicyConfig() + { + } + + private static final int DEFAULT_MAX_ATTEMPTS = 5; + private static final long DEFAULT_DELAY_IN_MS = 5000; + private static final long DEFAULT_MAX_DELAY_IN_MS = 30000; + private static final long DEFAULT_INITIAL_DELAY_IN_MS = 86400000; + private static final long DEFAULT_FIXED_DELAY_IN_MS = 86400000; + private static final TimeUnit DEFAULT_TIME_UNIT_IN_SECONDS = TimeUnit.SECONDS; + private RetryPolicyConfig.RetryDelay myRetryDelay = new RetryPolicyConfig.RetryDelay(); + private RetryPolicyConfig.RetrySchedule myRetrySchedule = new RetryPolicyConfig.RetrySchedule(); + + @JsonProperty("maxAttempts") + private Integer myMaxAttempts = DEFAULT_MAX_ATTEMPTS; + + @JsonProperty ("maxAttempts") + public Integer getMaxAttempts() + { + return myMaxAttempts; + } + + @JsonProperty("maxAttempts") + public void setMaxAttempts(final Integer maxAttempts) + { + if (maxAttempts != null) + { + this.myMaxAttempts = maxAttempts; + } + } + + @JsonProperty("delay") + public void setRetryDelay(final RetryDelay retryDelay) + { + myRetryDelay = retryDelay; + } + + @JsonProperty("delay") + public RetryDelay getRetryDelay() + { + return myRetryDelay; + } + + @JsonProperty("retrySchedule") + public RetrySchedule getRetrySchedule() + { + return myRetrySchedule; + } + + @JsonProperty("retrySchedule") + public void setRetrySchedule(final RetrySchedule retrySchedule) + { + myRetrySchedule = retrySchedule; + } + + private static long convertToMillis(final Long value, final TimeUnit unit) + { + return unit.toMillis(value); + } + + /** + * Configuration for retry delay parameter. + */ + public static final class RetryDelay + { + public RetryDelay() + { + + } + + @JsonProperty("start") + private long myDelay = DEFAULT_DELAY_IN_MS; + + @JsonProperty("max") + private long myMaxDelay = DEFAULT_MAX_DELAY_IN_MS; + + @JsonProperty("unit") + private TimeUnit myTimeUnit = DEFAULT_TIME_UNIT_IN_SECONDS; + + @JsonProperty("start") + public long getStartDelay() + { + return myDelay; + } + + @JsonProperty("start") + public void setStartDelay(final Long delay) + { + if (delay != null) + { + long convertedDelay = convertToMillis(delay, myTimeUnit); + if (convertedDelay > myMaxDelay) + { + throw new IllegalArgumentException("Start delay cannot be greater than max delay."); + } + this.myDelay = convertToMillis(delay, myTimeUnit); + } + } + + @JsonProperty("max") + public long getMaxDelay() + { + return myMaxDelay; + } + + @JsonProperty("max") + public void setMaxDelay(final Long maxDelay) + { + if (maxDelay != null) + { + long convertedMaxDelay = convertToMillis(maxDelay, myTimeUnit); + if (convertedMaxDelay < myDelay) + { + throw new IllegalArgumentException("Max delay cannot be less than start delay."); + } + this.myMaxDelay = convertToMillis(maxDelay, myTimeUnit); + } + } + + @JsonProperty("unit") + public TimeUnit getUnit() + { + return myTimeUnit; + } + + @JsonProperty("unit") + public void setTimeUnit(final String unit) + { + if (unit != null && !unit.isBlank()) + { + myTimeUnit = TimeUnit.valueOf(unit.toUpperCase(Locale.US)); + } + } + } + + public static final class RetrySchedule + { + public RetrySchedule() + { + + } + + @JsonProperty("initialDelay") + private long myInitialDelay = DEFAULT_INITIAL_DELAY_IN_MS; + + @JsonProperty("fixedDelay") + private long myFixedDelay = DEFAULT_FIXED_DELAY_IN_MS; + + @JsonProperty("unit") + private TimeUnit myTimeUnit = DEFAULT_TIME_UNIT_IN_SECONDS; + + @JsonProperty("initialDelay") + public long getInitialDelay() + { + return myInitialDelay; + } + + @JsonProperty("initialDelay") + public void setInitialDelay(final Long initialDelay) + { + if (initialDelay != null) + { + this.myInitialDelay = convertToMillis(initialDelay, myTimeUnit); + } + } + + @JsonProperty("fixedDelay") + public long getFixedDelay() + { + return myFixedDelay; + } + + @JsonProperty("fixedDelay") + public void setFixedDelay(final Long fixedDelay) + { + if (fixedDelay != null) + { + this.myFixedDelay = convertToMillis(fixedDelay, myTimeUnit); + } + } + + @JsonProperty("unit") + public TimeUnit getUnit() + { + return myTimeUnit; + } + + @JsonProperty("unit") + public void setTimeUnit(final String unit) + { + if (unit != null && !unit.isBlank()) + { + myTimeUnit = TimeUnit.valueOf(unit.toUpperCase(Locale.US)); + } + } + } +} diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/BeanConfigurator.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/BeanConfigurator.java index 57e449bfb..d0a8ef5d5 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/BeanConfigurator.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/BeanConfigurator.java @@ -208,6 +208,15 @@ public DistributedJmxConnectionProvider distributedJmxConnectionProvider( jmxSecurity::get, distributedNativeConnectionProvider, eccNodesSync); } + @Bean + public RetrySchedulerService retrySchedulerService(final Config config, + final DistributedJmxConnectionProvider jmxConnectionProvider, + final EccNodesSync eccNodesSync, + final DistributedNativeConnectionProvider nativeConnectionProvider) + { + return new RetrySchedulerService(eccNodesSync, config, jmxConnectionProvider, nativeConnectionProvider); + } + private Security getSecurityConfig() throws ConfigurationException { return ConfigurationHelper.DEFAULT_INSTANCE.getConfiguration(SECURITY_FILE, Security.class); diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetryBackoffStrategy.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetryBackoffStrategy.java new file mode 100644 index 000000000..ad22dcb4b --- /dev/null +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetryBackoffStrategy.java @@ -0,0 +1,72 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.application.spring; + +import com.ericsson.bss.cassandra.ecchronos.application.config.connection.RetryPolicyConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class RetryBackoffStrategy +{ + private static final Logger LOG = LoggerFactory.getLogger(RetryBackoffStrategy.class); + private static final int ONE_SECOND_IN_MS = 1000; + private static final int RETRY_DELAY_MULTIPLIER = 2; + private final RetryPolicyConfig myRetryPolicyConfig; + private final RetryPolicyConfig.RetrySchedule myRetrySchedule; + private final RetryPolicyConfig.RetryDelay myRetryDelay; + + public RetryBackoffStrategy(final RetryPolicyConfig retryPolicyConfig) + { + myRetryPolicyConfig = retryPolicyConfig; + myRetrySchedule = retryPolicyConfig.getRetrySchedule(); + myRetryDelay = retryPolicyConfig.getRetryDelay(); + } + + public long getInitialDelay() + { + return myRetrySchedule.getInitialDelay(); + } + + public long getFixedDelay() + { + return myRetrySchedule.getFixedDelay(); + } + + public int getMaxAttempts() + { + return myRetryPolicyConfig.getMaxAttempts(); + } + + public long calculateDelay(final int attempt) + { + long baseDelay = myRetryDelay.getStartDelay(); + long calculatedDelay = baseDelay * ((long) attempt * RETRY_DELAY_MULTIPLIER); + LOG.debug("Calculated delay for attempt {}: {} ms", attempt, calculatedDelay); + return Math.min(calculatedDelay, myRetryDelay.getMaxDelay() * ONE_SECOND_IN_MS); + } + + public void sleepBeforeNextRetry(final long delayMillis) + { + try + { + Thread.sleep(delayMillis); + } + catch (InterruptedException e) + { + LOG.error("Interrupted during sleep between retry attempts", e); + Thread.currentThread().interrupt(); + } + } +} diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerService.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerService.java new file mode 100644 index 000000000..ce70146cb --- /dev/null +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerService.java @@ -0,0 +1,219 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.application.spring; + +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.ericsson.bss.cassandra.ecchronos.application.config.Config; +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.data.enums.NodeStatus; +import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync; +import com.google.common.annotations.VisibleForTesting; +import jakarta.annotation.PostConstruct; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Objects; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.stereotype.Service; + +import javax.management.remote.JMXConnector; +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Service responsible for managing and scheduling retry attempts to reconnect to Cassandra nodes that have become unavailable. + *

+ * This service periodically checks the status of nodes and attempts to reconnect based on a configurable retry policy. + * It uses a scheduled executor service to perform retries at fixed intervals, with the intervals and the retry logic + * configurable via external configurations. + *

+ * + *

+ * The retry logic involves calculating the delay between attempts, which increases with each subsequent retry for a node. + * If the maximum number of retry attempts is reached, the node is marked as unreachable. + *

+ * + *

+ * This service is designed to run continuously in the background, adjusting its behavior based on the state of the + * Cassandra cluster and the provided configurations. It also ensures that resources are properly cleaned up on shutdown. + *

+ */ + +@Service +public final class RetrySchedulerService implements DisposableBean +{ + + private static final Logger LOG = LoggerFactory.getLogger(RetrySchedulerService.class); + private static final String COLUMN_NODE_ID = "node_id"; + private static final String COLUMN_NODE_STATUS = "node_status"; + private static final int DEFAULT_SCHEDULER_AWAIT_TERMINATION_IN_SECONDS = 60; + private final EccNodesSync myEccNodesSync; + private final DistributedJmxConnectionProvider myJmxConnectionProvider; + private final DistributedNativeConnectionProvider myDistributedNativeConnectionProvider; + private final RetryBackoffStrategy retryBackoffStrategy; + private final ScheduledExecutorService myScheduler = Executors.newScheduledThreadPool(1); + + public RetrySchedulerService(final EccNodesSync eccNodesSync, + final Config config, + final DistributedJmxConnectionProvider jmxConnectionProvider, + final DistributedNativeConnectionProvider distributedNativeConnectionProvider) + { + this.myEccNodesSync = eccNodesSync; + this.myJmxConnectionProvider = jmxConnectionProvider; + this.myDistributedNativeConnectionProvider = distributedNativeConnectionProvider; + this.retryBackoffStrategy = new RetryBackoffStrategy(config.getConnectionConfig().getJmxConnection().getRetryPolicyConfig()); + } + + @PostConstruct + public void startScheduler() + { + long initialDelay = retryBackoffStrategy.getInitialDelay(); + long fixedDelay = retryBackoffStrategy.getFixedDelay(); + + LOG.debug("Starting RetrySchedulerService with initialDelay={} ms and fixedDelay={} ms", initialDelay, fixedDelay); + + myScheduler.scheduleWithFixedDelay(this::retryNodes, initialDelay, fixedDelay, TimeUnit.MILLISECONDS); + } + + @VisibleForTesting + void retryNodes() + { + LOG.warn("Retrying unavailable nodes"); + List unavailableNodes = findUnavailableNodes(); + + if (unavailableNodes.isEmpty()) + { + return; + } + + unavailableNodes.forEach(this::retryConnectionForNode); + } + + private List findUnavailableNodes() + { + List unavailableNodes = new ArrayList<>(); + ResultSet resultSet = myEccNodesSync.getResultSet(); + + for (Row row : resultSet) + { + UUID nodeId = row.getUuid(COLUMN_NODE_ID); + String status = Objects.requireNonNull(row.getString(COLUMN_NODE_STATUS)).toUpperCase(Locale.ENGLISH); + + if (NodeStatus.UNAVAILABLE.name().equals(status)) + { + myDistributedNativeConnectionProvider.getNodes() + .stream() + .filter(node -> Objects.equals(node.getHostId(), nodeId)) + .findFirst() + .ifPresent(unavailableNodes::add); + } + } + + return unavailableNodes; + } + + private void retryConnectionForNode(final Node node) + { + UUID nodeId = node.getHostId(); + for (int attempt = 1; attempt <= retryBackoffStrategy.getMaxAttempts(); attempt++) + { + if (tryReconnectToNode(node, nodeId, attempt)) + { + return; // Successfully reconnected, exit method + } + } + markNodeUnreachable(node, nodeId); + } + + private boolean tryReconnectToNode(final Node node, final UUID nodeId, final int attempt) + { + long delayMillis = retryBackoffStrategy.calculateDelay(attempt); + LOG.warn("Attempting to reconnect to node: {}, attempt: {}", nodeId, attempt); + + if (establishConnectionToNode(node)) + { + LOG.info("Successfully reconnected to node: {}", nodeId); + myEccNodesSync.updateNodeStatus(NodeStatus.AVAILABLE, node.getDatacenter(), nodeId); + return true; + } + else + { + LOG.warn("Failed to reconnect to node: {}, next retry in {} ms", nodeId, delayMillis); + retryBackoffStrategy.sleepBeforeNextRetry(delayMillis); + return false; + } + } + + private void markNodeUnreachable(final Node node, final UUID nodeId) + { + LOG.error("Max retry attempts reached for node: {}. Marking as UNREACHABLE.", nodeId); + myEccNodesSync.updateNodeStatus(NodeStatus.UNREACHABLE, node.getDatacenter(), nodeId); + } + + private boolean establishConnectionToNode(final Node node) + { + UUID nodeId = node.getHostId(); + JMXConnector jmxConnector = myJmxConnectionProvider.getJmxConnector(nodeId); + boolean isConnected = jmxConnector != null && isConnected(jmxConnector); + + if (isConnected) + { + myJmxConnectionProvider.getJmxConnections().put(nodeId, jmxConnector); + LOG.info("Node {} connected successfully.", nodeId); + } + else + { + LOG.warn("Failed to connect to node {}.", nodeId); + } + + return isConnected; + } + + private boolean isConnected(final JMXConnector jmxConnector) + { + try + { + jmxConnector.getConnectionId(); + return true; + } + catch (IOException e) + { + LOG.error("Error while checking connection for JMX connector", e); + return false; + } + } + + @Override + public void destroy() + { + LOG.info("Shutting down RetrySchedulerService..."); + RetryServiceShutdownManager.shutdownExecutorService(myScheduler, DEFAULT_SCHEDULER_AWAIT_TERMINATION_IN_SECONDS, TimeUnit.SECONDS); + LOG.info("RetrySchedulerService shut down complete."); + } + + @VisibleForTesting + ScheduledExecutorService getMyScheduler() + { + return myScheduler; + } +} diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetryServiceShutdownManager.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetryServiceShutdownManager.java new file mode 100644 index 000000000..9187b4ca5 --- /dev/null +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetryServiceShutdownManager.java @@ -0,0 +1,55 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.application.spring; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class RetryServiceShutdownManager +{ + private static final Logger LOG = LoggerFactory.getLogger(RetryServiceShutdownManager.class); + + private RetryServiceShutdownManager() + { + + } + + public static void shutdownExecutorService(final ScheduledExecutorService executorService, + final int timeout, + final TimeUnit unit) + { + executorService.shutdown(); + try + { + if (!executorService.awaitTermination(timeout, unit)) + { + LOG.warn("Executor did not terminate in time, forcing shutdown..."); + executorService.shutdownNow(); + if (!executorService.awaitTermination(timeout, unit)) + { + LOG.error("Executor did not terminate after force shutdown."); + } + } + } + catch (InterruptedException e) + { + LOG.error("Interrupted during shutdown. Forcing shutdown now...", e); + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } +} diff --git a/application/src/main/resources/ecc.yml b/application/src/main/resources/ecc.yml index 34a0f9701..b6f0c2673 100644 --- a/application/src/main/resources/ecc.yml +++ b/application/src/main/resources/ecc.yml @@ -93,6 +93,32 @@ connection: ## The default provider will be used unless another is specified. ## provider: com.ericsson.bss.cassandra.ecchronos.application.providers.AgentJmxConnectionProvider + retryPolicy: + ## Max number of attempts ecChronos will try to connect with Cassandra. + maxAttempts: 5 + ## Delay use to wait between an attempt and another, this value will be multiplied by the current attempt count powered by two. + ## If the current attempt is 4 and the default delay is 5 seconds, so ((4(attempt) x 2) x 5(default delay)) = 40 seconds. + ## If the calculated delay is greater than maxDelay, maxDelay will be used instead of the calculated delay. + delay: + start: 5 + ## Maximum delay before the next connection attempt is made. + ## Setting it as 0 will disable maxDelay and the delay interval will + ## be calculated based on the attempt count and the default delay. + max: 30 + ## unit can be milliseconds, seconds, minutes, hours, days. + unit: seconds + # RetrySchedulerService schedule starts after the application launches. + retrySchedule: + ## Initial delay before the RetrySchedulerService starts after the application launches. + ## By default, this service would trigger after 1 day and represents the wait time before the service first runs + initialDelay: 1 + ## Fixed delay between subsequent executions of the RetrySchedulerService. + ## This value represents the delay time (by default 1 day) between when the service completes one execution + ## and the time it will be triggered again for another execution. + ## During each execution, the service will check for unavailable nodes and attempt reconnections as per the retry logic. + fixedDelay: 1 + ## unit can be milliseconds, seconds, minutes, hours, days. + unit: days rest_server: ## diff --git a/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java index f299ebe48..c1a88fa21 100644 --- a/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java +++ b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java @@ -14,10 +14,9 @@ */ package com.ericsson.bss.cassandra.ecchronos.application.config; -import com.ericsson.bss.cassandra.ecchronos.application.config.connection.AgentConnectionConfig; -import com.ericsson.bss.cassandra.ecchronos.application.config.connection.ConnectionConfig; -import com.ericsson.bss.cassandra.ecchronos.application.config.connection.DistributedNativeConnection; +import com.ericsson.bss.cassandra.ecchronos.application.config.connection.*; import com.ericsson.bss.cassandra.ecchronos.application.exceptions.ConfigurationException; +import com.ericsson.bss.cassandra.ecchronos.application.providers.AgentJmxConnectionProvider; import com.ericsson.bss.cassandra.ecchronos.application.providers.AgentNativeConnectionProvider; import com.ericsson.bss.cassandra.ecchronos.connection.DataCenterAwarePolicy; import com.fasterxml.jackson.core.exc.StreamReadException; @@ -25,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; @@ -32,13 +32,18 @@ import java.io.IOException; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; public class TestConfig { private static final String DEFAULT_AGENT_FILE_NAME = "all_set.yml"; + private static final String NOTHING_SET_AGENT_FILE_NAME = "nothing_set.yml"; + private static final TimeUnit TIME_UNIT_IN_SECONDS = TimeUnit.SECONDS; private static Config config; private static DistributedNativeConnection nativeConnection; + private static DistributedJmxConnection distributedJmxConnection; @Before public void setup() throws StreamReadException, DatabindException, IOException @@ -54,6 +59,7 @@ public void setup() throws StreamReadException, DatabindException, IOException ConnectionConfig connection = config.getConnectionConfig(); nativeConnection = connection.getCqlConnection(); + distributedJmxConnection = connection.getJmxConnection(); } @Test @@ -85,7 +91,8 @@ public void testDefaultDatacenterAware() .getAgentConnectionConfig() .getDatacenterAware() .getDatacenters() - .get("datacenter1").getName()).isEqualTo("datacenter1"); + .get("datacenter1") + .getName()).isEqualTo("datacenter1"); } @Test @@ -95,9 +102,9 @@ public void testDefaultRackAware() assertThat(nativeConnection .getAgentConnectionConfig() .getRackAware() - .getRacks().get("rack1") - .getDatacenterName() - ).isEqualTo("datacenter1"); + .getRacks() + .get("rack1") + .getDatacenterName()).isEqualTo("datacenter1"); } @Test @@ -106,26 +113,34 @@ public void testDefaultHostAware() assertThat(nativeConnection.getAgentConnectionConfig().getHostAware()).isNotNull(); assertThat(nativeConnection .getAgentConnectionConfig() - .getHostAware().getHosts() - .get("127.0.0.1").getPort()) + .getHostAware() + .getHosts() + .get("127.0.0.1") + .getPort()) .isEqualTo(9042); assertThat(nativeConnection .getAgentConnectionConfig() - .getHostAware().getHosts() - .get("127.0.0.2").getPort()) + .getHostAware() + .getHosts() + .get("127.0.0.2") + .getPort()) .isEqualTo(9042); assertThat(nativeConnection .getAgentConnectionConfig() - .getHostAware().getHosts() - .get("127.0.0.3").getPort()) + .getHostAware() + .getHosts() + .get("127.0.0.3") + .getPort()) .isEqualTo(9042); assertThat(nativeConnection .getAgentConnectionConfig() - .getHostAware().getHosts() - .get("127.0.0.4").getPort()) + .getHostAware() + .getHosts() + .get("127.0.0.4") + .getPort()) .isEqualTo(9042); } @@ -139,7 +154,8 @@ public void testAgentProviderConfig() @Test public void testConfigurationExceptionForWrongAgentType() { - assertThrows(ConfigurationException.class, () -> { + assertThrows(ConfigurationException.class, () -> + { nativeConnection.getAgentConnectionConfig().setType("wrongType"); }); } @@ -156,4 +172,68 @@ public void testDefaultLoadBalancingPolicy() { assertThat(nativeConnection.getAgentConnectionConfig().getDatacenterAwarePolicy()).isEqualTo(DataCenterAwarePolicy.class); } -} \ No newline at end of file + + @Test + public void testRetryPolicyConfig() + { + Class providerClass = distributedJmxConnection.getProviderClass(); + assertThat(providerClass).isEqualTo(AgentJmxConnectionProvider.class); + RetryPolicyConfig retryPolicyConfig = distributedJmxConnection.getRetryPolicyConfig(); + assertNotNull(retryPolicyConfig); + assertThat(5).isEqualTo(retryPolicyConfig.getMaxAttempts()); + assertThat(5000).isEqualTo(retryPolicyConfig.getRetryDelay().getStartDelay()); + assertThat(30000).isEqualTo(retryPolicyConfig.getRetryDelay().getMaxDelay()); + assertThat(TIME_UNIT_IN_SECONDS).isEqualTo(retryPolicyConfig.getRetryDelay().getUnit()); + assertThat(86400000).isEqualTo(retryPolicyConfig.getRetrySchedule().getInitialDelay()); + assertThat(86400000).isEqualTo(retryPolicyConfig.getRetrySchedule().getFixedDelay()); + assertThat(TIME_UNIT_IN_SECONDS).isEqualTo(retryPolicyConfig.getRetrySchedule().getUnit()); + } + + @Test + public void testRetryPolicyConfigWhenNothingSet() throws IOException + { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + File file = new File(classLoader.getResource(NOTHING_SET_AGENT_FILE_NAME).getFile()); + ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory()); + Config config = objectMapper.readValue(file, Config.class); + + ConnectionConfig connection = config.getConnectionConfig(); + distributedJmxConnection = connection.getJmxConnection(); + Class providerClass = distributedJmxConnection.getProviderClass(); + assertThat(providerClass).isEqualTo(AgentJmxConnectionProvider.class); + RetryPolicyConfig retryPolicyConfig = distributedJmxConnection.getRetryPolicyConfig(); + assertNotNull(retryPolicyConfig); + assertThat(5).isEqualTo(retryPolicyConfig.getMaxAttempts()); + assertThat(5000).isEqualTo(retryPolicyConfig.getRetryDelay().getStartDelay()); + assertThat(30000).isEqualTo(retryPolicyConfig.getRetryDelay().getMaxDelay()); + assertThat(TIME_UNIT_IN_SECONDS).isEqualTo(retryPolicyConfig.getRetryDelay().getUnit()); + assertThat(86400000).isEqualTo(retryPolicyConfig.getRetrySchedule().getInitialDelay()); + assertThat(86400000).isEqualTo(retryPolicyConfig.getRetrySchedule().getFixedDelay()); + assertThat(TIME_UNIT_IN_SECONDS).isEqualTo(retryPolicyConfig.getRetrySchedule().getUnit()); + } + + @Test + public void testStartDelayGreaterThanMaxDelayThrowsException() + { + RetryPolicyConfig.RetryDelay retryDelay = new RetryPolicyConfig.RetryDelay(); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> + { + retryDelay.setMaxDelay(1000L); + retryDelay.setStartDelay(2000L); + }); + assertEquals("Start delay cannot be greater than max delay.", exception.getMessage()); + } + + @Test + public void testMaxDelayLessThanStartDelayThrowsException() + { + RetryPolicyConfig.RetryDelay retryDelay = new RetryPolicyConfig.RetryDelay(); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> + { + retryDelay.setMaxDelay(3000L); + retryDelay.setStartDelay(2000L); + retryDelay.setMaxDelay(1000L); + }); + assertEquals("Max delay cannot be less than start delay.", exception.getMessage()); + } +} diff --git a/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerServiceTest.java b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerServiceTest.java new file mode 100644 index 000000000..ffaceddef --- /dev/null +++ b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerServiceTest.java @@ -0,0 +1,211 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.application.spring; + +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.ericsson.bss.cassandra.ecchronos.application.config.Config; +import com.ericsson.bss.cassandra.ecchronos.application.config.connection.ConnectionConfig; +import com.ericsson.bss.cassandra.ecchronos.application.config.connection.DistributedJmxConnection; +import com.ericsson.bss.cassandra.ecchronos.application.config.connection.RetryPolicyConfig; +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.data.enums.NodeStatus; +import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import javax.management.remote.JMXConnector; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +class RetrySchedulerServiceTest +{ + @Mock + private EccNodesSync eccNodesSync; + @Mock + private Config config; + @Mock + private DistributedJmxConnectionProvider jmxConnectionProvider; + @Mock + private DistributedNativeConnectionProvider nativeConnectionProvider; + @Mock + private RetryPolicyConfig retryPolicyConfig; + @Mock + private RetryPolicyConfig.RetryDelay retryDelay; + @Mock + private RetryPolicyConfig.RetrySchedule retrySchedule; + private RetrySchedulerService retrySchedulerService; + + @BeforeEach + void setUp() + { + MockitoAnnotations.openMocks(this); + + // Mock nested method calls for Config + ConnectionConfig connectionConfig = mock(ConnectionConfig.class); + DistributedJmxConnection jmxConnection = mock(DistributedJmxConnection.class); + + when(config.getConnectionConfig()).thenReturn(connectionConfig); + when(connectionConfig.getJmxConnection()).thenReturn(jmxConnection); + when(retryDelay.getStartDelay()).thenReturn(500L); + when(retryDelay.getMaxDelay()).thenReturn(5000L); + when(jmxConnection.getRetryPolicyConfig()).thenReturn(retryPolicyConfig); + + // Mock behavior for retryPolicyConfig + when(retrySchedule.getInitialDelay()).thenReturn(1000L); + when(retrySchedule.getFixedDelay()).thenReturn(1000L); + when(retryPolicyConfig.getMaxAttempts()).thenReturn(3); + when(retryPolicyConfig.getRetryDelay()).thenReturn(retryDelay); + when(retryPolicyConfig.getRetrySchedule()).thenReturn(retrySchedule); + + // Initialize RetrySchedulerService with new RetryBackoffStrategy + retrySchedulerService = new RetrySchedulerService(eccNodesSync, config, jmxConnectionProvider, nativeConnectionProvider); + } + + @Test + void testSchedulerStart() + { + // Call the method under test + retrySchedulerService.startScheduler(); + // Verify that the scheduler starts correctly with the correct initialDelay and fixedDelay + verify(retrySchedule, times(1)).getInitialDelay(); + verify(retrySchedule, times(1)).getFixedDelay(); + } + + @Test + void testRetryNodesWhenNoUnavailableNodes() + { + // Mock ResultSet and Row to simulate no unavailable nodes + ResultSet mockResultSet = mock(ResultSet.class); + when(eccNodesSync.getResultSet()).thenReturn(mockResultSet); + when(mockResultSet.iterator()).thenReturn(Collections.emptyIterator()); + + retrySchedulerService.retryNodes(); + + // Verify that no nodes are marked as UNAVAILABLE or retried + verify(eccNodesSync, never()).updateNodeStatus(any(), anyString(), any()); + verify(jmxConnectionProvider, never()).getJmxConnector(any()); + } + + @Test + void testRetryNodesWithUnavailableNodeWhenConnectionSuccessful() throws IOException + { + // Create mock objects + ResultSet mockResultSet = mock(ResultSet.class); + Row mockRow = mock(Row.class); + UUID nodeId = UUID.randomUUID(); + + // Setup mock behavior for ResultSet and Row + when(mockRow.getString("node_status")).thenReturn(NodeStatus.UNAVAILABLE.name()); + when(mockRow.getUuid("node_id")).thenReturn(nodeId); + when(mockResultSet.iterator()).thenReturn(Collections.singletonList(mockRow).iterator()); + when(eccNodesSync.getResultSet()).thenReturn(mockResultSet); + + // Mock Node and its behavior + Node mockNode = mock(Node.class); + when(mockNode.getHostId()).thenReturn(nodeId); + when(mockNode.getDatacenter()).thenReturn("datacenter"); + when(nativeConnectionProvider.getNodes()).thenReturn(Collections.singletonList(mockNode)); + + // Mock JMX connector behavior + JMXConnector mockJmxConnector = mock(JMXConnector.class); + when(jmxConnectionProvider.getJmxConnector(nodeId)).thenReturn(mockJmxConnector); + when(mockJmxConnector.getConnectionId()).thenReturn("connected"); + + // Mock the JMX connections map + ConcurrentHashMap mockJmxConnections = mock(ConcurrentHashMap.class); + when(jmxConnectionProvider.getJmxConnections()).thenReturn(mockJmxConnections); + + // Call the method under test + retrySchedulerService.retryNodes(); + + // Verify interactions + // Ensure the status update call is made with expected parameters + verify(eccNodesSync).updateNodeStatus(NodeStatus.AVAILABLE, "datacenter", nodeId); + + // Verify JMXConnector is added to the map + verify(mockJmxConnections).put(eq(nodeId), any(JMXConnector.class)); + } + + @Test + void testRetryNodesWithUnavailableNodeWhenConnectionFailed() throws IOException + { + // Mock the node as unavailable + ResultSet mockResultSet = mock(ResultSet.class); + when(eccNodesSync.getResultSet()).thenReturn(mockResultSet); + Row mockRow = mock(Row.class); + when(mockRow.getString("node_status")).thenReturn("UNAVAILABLE"); + UUID nodeId = UUID.randomUUID(); + when(mockRow.getUuid("node_id")).thenReturn(nodeId); + when(mockResultSet.iterator()).thenReturn(Collections.singletonList(mockRow).iterator()); + + // Mock the nodes list to contain the unavailable node + Node mockNode = mock(Node.class); + when(mockNode.getHostId()).thenReturn(nodeId); + when(mockNode.getDatacenter()).thenReturn("datacenter"); + when(nativeConnectionProvider.getNodes()).thenReturn(Collections.singletonList(mockNode)); + + // Mock the JMX connection behavior for a failed reconnection + when(jmxConnectionProvider.getJmxConnector(nodeId)).thenReturn(null); + + retrySchedulerService.retryNodes(); + + // Check the node status update to UNREACHABLE + verify(eccNodesSync).updateNodeStatus(eq(NodeStatus.UNREACHABLE), anyString(), eq(nodeId)); + } + + @Test + void testMaxRetryAttemptsReached() throws IOException + { + // Mock the node as unavailable + ResultSet mockResultSet = mock(ResultSet.class); + when(eccNodesSync.getResultSet()).thenReturn(mockResultSet); + Row mockRow = mock(Row.class); + when(mockRow.getString("node_status")).thenReturn("UNAVAILABLE"); + UUID nodeId = UUID.randomUUID(); + when(mockRow.getUuid("node_id")).thenReturn(nodeId); + when(mockResultSet.iterator()).thenReturn(Collections.singletonList(mockRow).iterator()); + + // Mock the nodes list to contain the unavailable node + Node mockNode = mock(Node.class); + when(mockNode.getDatacenter()).thenReturn("datacenter"); + when(mockNode.getHostId()).thenReturn(nodeId); + when(nativeConnectionProvider.getNodes()).thenReturn(Collections.singletonList(mockNode)); + + // Simulate failed reconnects up to max attempts + when(jmxConnectionProvider.getJmxConnector(nodeId)).thenReturn(null); + + // Call the method under test + retrySchedulerService.retryNodes(); + + // Verify that the node is marked as UNREACHABLE after reaching max retry attempts + verify(eccNodesSync).updateNodeStatus(NodeStatus.UNREACHABLE, "datacenter", nodeId); + } + + @Test + void testDestroyShutdownService() + { + retrySchedulerService.destroy(); + assertTrue(retrySchedulerService.getMyScheduler().isShutdown()); + } +} diff --git a/application/src/test/resources/all_set.yml b/application/src/test/resources/all_set.yml index e7bd0aad3..53917e31a 100644 --- a/application/src/test/resources/all_set.yml +++ b/application/src/test/resources/all_set.yml @@ -43,6 +43,17 @@ connection: port: 9042 provider: com.ericsson.bss.cassandra.ecchronos.application.providers.AgentNativeConnectionProvider jmx: + provider: com.ericsson.bss.cassandra.ecchronos.application.providers.AgentJmxConnectionProvider + retryPolicy: + maxAttempts: 5 + delay: + start: 5 + max: 30 + unit: seconds + retrySchedule: + initialDelay: 86400 + fixedDelay: 86400 + unit: seconds rest_server: host: 127.0.0.2 diff --git a/application/src/test/resources/nothing_set.yml b/application/src/test/resources/nothing_set.yml new file mode 100644 index 000000000..65767b5b8 --- /dev/null +++ b/application/src/test/resources/nothing_set.yml @@ -0,0 +1,29 @@ +# +# Copyright 2024 Telefonaktiebolaget LM Ericsson +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +connection: + cql: + jmx: + provider: com.ericsson.bss.cassandra.ecchronos.application.providers.AgentJmxConnectionProvider + retryPolicy: + maxAttempts: + delay: + start: + max: + unit: + retrySchedule: + initialDelay: + fixedDelay: + unit: \ No newline at end of file diff --git a/check_style.xml b/check_style.xml index c6cd70e7e..7ea324ee1 100644 --- a/check_style.xml +++ b/check_style.xml @@ -61,7 +61,7 @@ - + diff --git a/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/enums/NodeStatus.java b/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/enums/NodeStatus.java index 706a5998a..845b35e1e 100644 --- a/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/enums/NodeStatus.java +++ b/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/enums/NodeStatus.java @@ -20,5 +20,6 @@ public enum NodeStatus { UNAVAILABLE, - AVAILABLE + AVAILABLE, + UNREACHABLE } diff --git a/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/sync/EccNodesSync.java b/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/sync/EccNodesSync.java index 4b70b0878..389b3ed71 100644 --- a/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/sync/EccNodesSync.java +++ b/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/sync/EccNodesSync.java @@ -16,8 +16,8 @@ import com.datastax.oss.driver.api.core.ConsistencyLevel; import com.datastax.oss.driver.api.core.CqlSession; -import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.querybuilder.QueryBuilder; @@ -35,8 +35,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; /** * CQL Definition for nodes_sync table. CREATE TABLE ecchronos_agent.nodes_sync ( ecchronos_id TEXT, datacenter_name @@ -66,6 +66,7 @@ public final class EccNodesSync private final PreparedStatement myCreateStatement; private final PreparedStatement myUpdateStatusStatement; + private final PreparedStatement mySelectStatusStatement; private EccNodesSync(final Builder builder) throws UnknownHostException { @@ -91,9 +92,20 @@ private EccNodesSync(final Builder builder) throws UnknownHostException .whereColumn(COLUMN_NODE_ID).isEqualTo(bindMarker()) .build() .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)); + mySelectStatusStatement = mySession.prepare(selectFrom(KEYSPACE_NAME, TABLE_NAME) + .columns(COLUMN_NODE_ID, COLUMN_NODE_ENDPOINT, COLUMN_DC_NAME, COLUMN_NODE_STATUS) + .whereColumn(COLUMN_ECCHRONOS_ID).isEqualTo(bindMarker()) + .build()); ecChronosID = builder.myEcchronosID; } + public ResultSet getResultSet() + { + // Bind the parameters + BoundStatement boundStatement = mySelectStatusStatement.bind(ecChronosID); + return mySession.execute(boundStatement); + } + public void acquireNodes() throws EcChronosException { if (myNodesList.isEmpty()) diff --git a/data/src/test/java/com/ericsson/bss/cassandra/ecchronos/data/sync/TestEccNodesSync.java b/data/src/test/java/com/ericsson/bss/cassandra/ecchronos/data/sync/TestEccNodesSync.java index daf0ea662..865cb387d 100644 --- a/data/src/test/java/com/ericsson/bss/cassandra/ecchronos/data/sync/TestEccNodesSync.java +++ b/data/src/test/java/com/ericsson/bss/cassandra/ecchronos/data/sync/TestEccNodesSync.java @@ -145,4 +145,13 @@ public void testEccNodesWithNullSession() NullPointerException.class, tmpEccNodesSyncBuilder::build); assertEquals("Session cannot be null", exception.getMessage()); } + + @Test + public void testGetAllNodes() + { + eccNodesSync.verifyInsertNodeInfo(datacenterName, "127.0.0.1", NodeStatus.AVAILABLE.name(), Instant.now(), Instant.now().plus(30, ChronoUnit.MINUTES), UUID.randomUUID()); + eccNodesSync.verifyInsertNodeInfo(datacenterName, "127.0.0.2", NodeStatus.UNAVAILABLE.name(), Instant.now(), Instant.now().plus(30, ChronoUnit.MINUTES), UUID.randomUUID()); + ResultSet resultSet = eccNodesSync.getResultSet(); + assertNotNull(resultSet); + } }