diff --git a/CHANGES.md b/CHANGES.md index 4c96673d4..9d1e61ef1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,8 @@ ## Version 1.0.0 (Not yet Release) +* Implement ReplicationStateImpl to Manage and Cache Token Range to Replica Mappings - Issue #719 +* Implement NodeResolverImpl to Resolve Nodes by IP Address and UUID - Issue #718 * Specify Interval for Next Connection - Issue #674 * Retry Policy for Jmx Connection - Issue #700 * Update Architecture and Tests Documentations to Add the Agent Features and The cassandra-test-image - Issue #707 diff --git a/application/pom.xml b/application/pom.xml index 175ba585d..1c1bad561 100644 --- a/application/pom.xml +++ b/application/pom.xml @@ -50,6 +50,18 @@ ${project.version} + + com.ericsson.bss.cassandra.ecchronos + core + ${project.version} + + + + com.ericsson.bss.cassandra.ecchronos + core.impl + ${project.version} + + org.springframework.boot diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/BeanConfigurator.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/BeanConfigurator.java index 30f582e0a..2d7ebfb7c 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/BeanConfigurator.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/BeanConfigurator.java @@ -14,11 +14,16 @@ */ package com.ericsson.bss.cassandra.ecchronos.application.spring; +import com.datastax.oss.driver.api.core.CqlSession; import com.ericsson.bss.cassandra.ecchronos.application.config.Interval; import com.ericsson.bss.cassandra.ecchronos.application.config.security.CqlTLSConfig; import com.ericsson.bss.cassandra.ecchronos.application.config.security.ReloadingCertificateHandler; import com.ericsson.bss.cassandra.ecchronos.application.providers.AgentJmxConnectionProvider; import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.core.impl.metadata.NodeResolverImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.state.ReplicationStateImpl; +import com.ericsson.bss.cassandra.ecchronos.core.metadata.NodeResolver; +import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState; import com.ericsson.bss.cassandra.ecchronos.data.exceptions.EcChronosException; import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync; @@ -220,6 +225,22 @@ public RetrySchedulerService retrySchedulerService(final Config config, return new RetrySchedulerService(eccNodesSync, config, jmxConnectionProvider, nativeConnectionProvider); } + @Bean + public NodeResolver nodeResolver(final DistributedNativeConnectionProvider distributedNativeConnectionProvider) + { + CqlSession session = distributedNativeConnectionProvider.getCqlSession(); + return new NodeResolverImpl(session); + } + + @Bean + public ReplicationState replicationState( + final DistributedNativeConnectionProvider distributedNativeConnectionProvider, + final NodeResolver nodeResolver) + { + CqlSession session = distributedNativeConnectionProvider.getCqlSession(); + return new ReplicationStateImpl(nodeResolver, session); + } + private Security getSecurityConfig() throws ConfigurationException { return ConfigurationHelper.DEFAULT_INSTANCE.getConfiguration(SECURITY_FILE, Security.class); diff --git a/core.impl/pom.xml b/core.impl/pom.xml new file mode 100644 index 000000000..675e2eb89 --- /dev/null +++ b/core.impl/pom.xml @@ -0,0 +1,102 @@ + + + + 4.0.0 + + com.ericsson.bss.cassandra.ecchronos + agent + 1.0.0-SNAPSHOT + + + core.impl + + + + + com.ericsson.bss.cassandra.ecchronos + core + ${project.version} + + + + com.ericsson.bss.cassandra.ecchronos + connection + ${project.version} + + + + com.datastax.oss + java-driver-core + + + + + com.datastax.oss + java-driver-query-builder + + + + com.google.guava + guava + + + + com.github.ben-manes.caffeine + caffeine + + + + + org.slf4j + slf4j-api + + + + + org.junit.vintage + junit-vintage-engine + test + + + + commons-io + commons-io + test + + + + org.awaitility + awaitility + test + + + + org.mockito + mockito-core + test + + + + org.assertj + assertj-core + test + + + \ No newline at end of file diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/metadata/NodeResolverImpl.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/metadata/NodeResolverImpl.java new file mode 100644 index 000000000..09b320bfc --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/metadata/NodeResolverImpl.java @@ -0,0 +1,102 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.metadata; + +import com.ericsson.bss.cassandra.ecchronos.core.metadata.DriverNode; +import com.ericsson.bss.cassandra.ecchronos.core.metadata.NodeResolver; +import java.net.InetAddress; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.metadata.Metadata; +import com.datastax.oss.driver.api.core.metadata.Node; + +public class NodeResolverImpl implements NodeResolver +{ + private final ConcurrentMap addressToNodeMap = new ConcurrentHashMap<>(); + private final ConcurrentMap idToNodeMap = new ConcurrentHashMap<>(); + + private final CqlSession session; + + public NodeResolverImpl(final CqlSession aSession) + { + this.session = aSession; + } + + @Override + public final Optional fromIp(final InetAddress inetAddress) + { + DriverNode node = addressToNodeMap.get(inetAddress); + + if (node == null) + { + node = addressToNodeMap.computeIfAbsent(inetAddress, address -> lookup(inetAddress)); + } + else if (!inetAddress.equals(node.getPublicAddress())) + { + // IP mapping is wrong, we should remove the old entry and retry + addressToNodeMap.remove(inetAddress, node); + return fromIp(inetAddress); + } + + return Optional.ofNullable(node); + } + + @Override + public final Optional fromUUID(final UUID nodeId) + { + return Optional.ofNullable(resolve(nodeId)); + } + + private DriverNode resolve(final UUID nodeId) + { + DriverNode node = idToNodeMap.get(nodeId); + if (node == null) + { + node = idToNodeMap.computeIfAbsent(nodeId, this::lookup); + } + + return node; + } + + private DriverNode lookup(final UUID nodeId) + { + Metadata metadata = session.getMetadata(); + for (Node node : metadata.getNodes().values()) + { + if (node.getHostId().equals(nodeId)) + { + return new DriverNode(node); + } + } + return null; + } + + private DriverNode lookup(final InetAddress inetAddress) + { + Metadata metadata = session.getMetadata(); + for (Node node : metadata.getNodes().values()) + { + if (node.getBroadcastAddress().get().getAddress().equals(inetAddress)) + { + return resolve(node.getHostId()); + } + } + return null; + } +} diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/metadata/package-info.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/metadata/package-info.java new file mode 100644 index 000000000..9d80d660d --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/metadata/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Contains the implementations and resources for mapping node metadata. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.metadata; diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/ReplicationStateImpl.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/ReplicationStateImpl.java new file mode 100644 index 000000000..adc108ade --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/ReplicationStateImpl.java @@ -0,0 +1,255 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.state; + +import com.ericsson.bss.cassandra.ecchronos.core.metadata.DriverNode; +import com.ericsson.bss.cassandra.ecchronos.core.metadata.NodeResolver; +import com.ericsson.bss.cassandra.ecchronos.core.state.LongTokenRange; +import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.metadata.Metadata; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.TokenMap; +import com.datastax.oss.driver.api.core.metadata.token.TokenRange; +import com.datastax.oss.driver.internal.core.metadata.token.Murmur3Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.ericsson.bss.cassandra.ecchronos.core.metadata.Metadata.quoteIfNeeded; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +/** + * Utility class to generate a token -> replicas map for a specific table. + */ +public class ReplicationStateImpl implements ReplicationState +{ + private static final Logger LOG = LoggerFactory.getLogger(ReplicationStateImpl.class); + + private static final Map>> + KEYSPACE_REPLICATION_CACHE = new ConcurrentHashMap<>(); + private static final Map>> + CLUSTER_WIDE_KEYSPACE_REPLICATION_CACHE = new ConcurrentHashMap<>(); + + private final NodeResolver myNodeResolver; + private final CqlSession mySession; + + public ReplicationStateImpl(final NodeResolver nodeResolver, final CqlSession session) + { + myNodeResolver = nodeResolver; + mySession = session; + } + + /** + * {@inheritDoc} + */ + @Override + public ImmutableSet getNodes( + final TableReference tableReference, + final LongTokenRange tokenRange, + final Node currentNode) + { + String keyspace = tableReference.getKeyspace(); + + ImmutableMap> replication = maybeRenew(keyspace, currentNode); + return getNodes(replication, tokenRange); + } + + /** + * {@inheritDoc} + */ + @Override + public ImmutableSet getReplicas( + final TableReference tableReference, + final Node currentNode) + { + Map> tokens = getTokenRangeToReplicas(tableReference, currentNode); + Set allReplicas = new HashSet<>(); + for (ImmutableSet replicas : tokens.values()) + { + allReplicas.addAll(replicas); + } + return ImmutableSet.copyOf(allReplicas); + } + + /** + * {@inheritDoc} + */ + @Override + public ImmutableSet getNodesClusterWide( + final TableReference tableReference, + final LongTokenRange tokenRange, + final Node currentNode) + { + String keyspace = tableReference.getKeyspace(); + + ImmutableMap> replication = maybeRenewClusterWide(keyspace, currentNode); + return getNodes(replication, tokenRange); + } + + private ImmutableSet getNodes(final ImmutableMap> replication, + final LongTokenRange tokenRange) + { + ImmutableSet nodes = replication.get(tokenRange); + + if (nodes == null) + { + for (Map.Entry> entry : replication.entrySet()) + { + if (entry.getKey().isCovering(tokenRange)) + { + nodes = entry.getValue(); + break; + } + } + } + + return nodes; + } + + /** + * Get token ranges to replicas. + * + * @param tableReference + * The table used to calculate the proper replication. + * @return Nodes and their ranges + */ + @Override + public Map> getTokenRangeToReplicas( + final TableReference tableReference, + final Node currentNode) + { + String keyspace = tableReference.getKeyspace(); + return maybeRenew(keyspace, currentNode); + } + + private ImmutableMap> maybeRenew( + final String keyspace, + final Node currentNode) + { + ImmutableMap> replication = buildTokenMap( + keyspace, + false, + currentNode); + + return KEYSPACE_REPLICATION_CACHE.compute(keyspace, (k, v) -> !replication.equals(v) ? replication : v); + } + + /** + * Get token ranges. + * + * @param tableReference Table reference. + * @return Nodes and their ranges + */ + @Override + public Map> getTokenRanges( + final TableReference tableReference, + final Node currentNode) + { + String keyspace = tableReference.getKeyspace(); + return maybeRenewClusterWide(keyspace, currentNode); + } + + private ImmutableMap> maybeRenewClusterWide( + final String keyspace, + final Node currentNode) + { + ImmutableMap> replication = buildTokenMap( + keyspace, + true, + currentNode); + + return CLUSTER_WIDE_KEYSPACE_REPLICATION_CACHE + .compute(keyspace, (k, v) -> !replication.equals(v) ? replication : v); + } + + private ImmutableMap> buildTokenMap( + final String keyspace, + final boolean clusterWide, + final Node currentNode) + { + ImmutableMap.Builder> replicationBuilder = ImmutableMap.builder(); + Map, ImmutableSet> replicaCache = new HashMap<>(); + Metadata metadata = mySession.getMetadata(); + Optional tokenMap = metadata.getTokenMap(); + if (!tokenMap.isPresent()) + { + throw new IllegalStateException("Cannot determine ranges, is metadata/tokenMap disabled?"); + } + String keyspaceName = quoteIfNeeded(keyspace); + Set tokenRanges; + if (clusterWide) + { + tokenRanges = tokenMap.get().getTokenRanges(); + } + else + { + tokenRanges = tokenMap.get().getTokenRanges(keyspaceName, currentNode); + } + for (TokenRange tokenRange : tokenRanges) + { + LongTokenRange longTokenRange = convert(tokenRange); + ImmutableSet replicas + = replicaCache.computeIfAbsent(tokenMap.get().getReplicas(keyspaceName, tokenRange), this::convert); + + replicationBuilder.put(longTokenRange, replicas); + } + + return replicationBuilder.build(); + } + + private ImmutableSet convert(final Set nodes) + { + ImmutableSet.Builder builder = new ImmutableSet.Builder<>(); + for (Node node : nodes) + { + Optional broadcastAddress = node.getBroadcastAddress(); + if (broadcastAddress.isPresent()) + { + Optional resolvedNode = myNodeResolver.fromIp(broadcastAddress.get().getAddress()); + if (resolvedNode.isPresent()) + { + builder.add(resolvedNode.get()); + } + else + { + LOG.warn("Node {} - {} not found in node resolver", node.getHostId(), broadcastAddress.get()); + } + } + else + { + LOG.warn("Could not determine broadcast address for node {}", node.getHostId()); + } + } + return builder.build(); + } + + private LongTokenRange convert(final TokenRange range) + { + // Assuming murmur3 partitioner + long start = ((Murmur3Token) range.getStart()).getValue(); + long end = ((Murmur3Token) range.getEnd()).getValue(); + return new LongTokenRange(start, end); + } +} diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/package-info.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/package-info.java new file mode 100644 index 000000000..860498539 --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Contains the implementation and resources for stateful declarations. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.state; diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/metadata/TestNodeResolverImpl.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/metadata/TestNodeResolverImpl.java new file mode 100644 index 000000000..469170020 --- /dev/null +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/metadata/TestNodeResolverImpl.java @@ -0,0 +1,169 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.metadata; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +import com.ericsson.bss.cassandra.ecchronos.core.metadata.DriverNode; +import com.ericsson.bss.cassandra.ecchronos.core.metadata.NodeResolver; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.metadata.Metadata; +import com.datastax.oss.driver.api.core.metadata.Node; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TestNodeResolverImpl +{ + @Mock + private Metadata mockMetadata; + + @Mock + private CqlSession mockCqlSession; + + private Map mockedNodes = new HashMap<>(); + + private NodeResolver nodeResolver; + + @Before + public void setup() throws Exception + { + when(mockMetadata.getNodes()).thenReturn(mockedNodes); + + // Add two dummy hosts so that we know we find the correct host + addNode(new InetSocketAddress(address("127.0.0.2"), 9042), "dc1"); + addNode(new InetSocketAddress(address("127.0.0.3"), 9042), "dc1"); + + when(mockCqlSession.getMetadata()).thenReturn(mockMetadata); + + nodeResolver = new NodeResolverImpl(mockCqlSession); + } + + @Test + public void testGetHost() throws Exception + { + Node node = addNode(new InetSocketAddress(address("127.0.0.1"), 9042), "dc1"); + + Optional maybeNode = nodeResolver.fromIp(address("127.0.0.1")); + assertThat(maybeNode).isPresent(); + assertThat(maybeNode.get().getId()).isEqualTo(node.getHostId()); + assertThat(maybeNode.get().getPublicAddress()).isEqualTo(address("127.0.0.1")); + assertThat(maybeNode.get().getDatacenter()).isEqualTo("dc1"); + + assertThat(nodeResolver.fromIp(address("127.0.0.1"))).containsSame(maybeNode.get()); + assertThat(nodeResolver.fromUUID(node.getHostId())).containsSame(maybeNode.get()); + } + + @Test + public void testChangeIpAddress() throws Exception + { + Node node = addNode(new InetSocketAddress(address("127.0.0.1"), 9042), "dc1"); + + Optional maybeNode = nodeResolver.fromIp(address("127.0.0.1")); + assertThat(maybeNode).isPresent(); + + assertThat(maybeNode.get().getPublicAddress()).isEqualTo(address("127.0.0.1")); + + when(node.getBroadcastAddress()).thenReturn(Optional.of(new InetSocketAddress(address("127.0.0.5"), 9042))); + + assertThat(maybeNode.get().getId()).isEqualTo(node.getHostId()); + assertThat(maybeNode.get().getPublicAddress()).isEqualTo(address("127.0.0.5")); + assertThat(maybeNode.get().getDatacenter()).isEqualTo("dc1"); + + // New mapping for the node + assertThat(nodeResolver.fromIp(address("127.0.0.5"))).containsSame(maybeNode.get()); + assertThat(nodeResolver.fromUUID(node.getHostId())).containsSame(maybeNode.get()); + + // Make sure the old mapping is removed + assertThat(nodeResolver.fromIp(address("127.0.0.1"))).isEmpty(); + } + + @Test + public void testChangeIpAddressAndAddNewReplica() throws Exception + { + Node node = addNode(new InetSocketAddress(address("127.0.0.1"), 9042), "dc1"); + + Optional maybeNode = nodeResolver.fromIp(address("127.0.0.1")); + assertThat(maybeNode).isPresent(); + + assertThat(maybeNode.get().getPublicAddress()).isEqualTo(address("127.0.0.1")); + + when(node.getBroadcastAddress()).thenReturn(Optional.of(new InetSocketAddress(address("127.0.0.5"), 9042))); + + assertThat(maybeNode.get().getId()).isEqualTo(node.getHostId()); + assertThat(maybeNode.get().getPublicAddress()).isEqualTo(address("127.0.0.5")); + assertThat(maybeNode.get().getDatacenter()).isEqualTo("dc1"); + + // New mapping for the node + assertThat(nodeResolver.fromIp(address("127.0.0.5"))).containsSame(maybeNode.get()); + assertThat(nodeResolver.fromUUID(node.getHostId())).containsSame(maybeNode.get()); + + // If a new node is using the old ip we should return it + Node newNode = addNode(new InetSocketAddress(address("127.0.0.1"), 9042), "dc2"); + + Optional maybeNewNode = nodeResolver.fromIp(address("127.0.0.1")); + assertThat(maybeNewNode).isPresent(); + + assertThat(maybeNewNode.get().getId()).isEqualTo(newNode.getHostId()); + assertThat(maybeNewNode.get().getPublicAddress()).isEqualTo(address("127.0.0.1")); + assertThat(maybeNewNode.get().getDatacenter()).isEqualTo("dc2"); + assertThat(nodeResolver.fromUUID(newNode.getHostId())).containsSame(maybeNewNode.get()); + + assertThat(maybeNewNode.get()).isNotSameAs(maybeNode.get()); + } + + @Test + public void testGetNonExistingHost() throws Exception + { + Optional maybeNode = nodeResolver.fromIp(address("127.0.0.1")); + assertThat(maybeNode).isEmpty(); + + maybeNode = nodeResolver.fromUUID(UUID.randomUUID()); + assertThat(maybeNode).isEmpty(); + } + + private InetAddress address(String address) throws UnknownHostException + { + return InetAddress.getByName(address); + } + + private Node addNode(InetSocketAddress broadcastAddress, String dataCenter) + { + Node node = mock(Node.class); + + UUID id = UUID.randomUUID(); + when(node.getHostId()).thenReturn(id); + when(node.getBroadcastAddress()).thenReturn(Optional.of(broadcastAddress)); + when(node.getDatacenter()).thenReturn(dataCenter); + + mockedNodes.put(id, node); + when(mockMetadata.getNodes()).thenReturn(mockedNodes); + return node; + } +} diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/TestReplicationStateImpl.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/TestReplicationStateImpl.java new file mode 100644 index 000000000..4fbe4cd1c --- /dev/null +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/TestReplicationStateImpl.java @@ -0,0 +1,407 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.state; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.metadata.Metadata; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.TokenMap; +import com.datastax.oss.driver.api.core.metadata.token.TokenRange; +import com.ericsson.bss.cassandra.ecchronos.core.metadata.DriverNode; +import com.ericsson.bss.cassandra.ecchronos.core.metadata.NodeResolver; +import com.ericsson.bss.cassandra.ecchronos.core.state.LongTokenRange; +import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.Optional; + + +import static com.ericsson.bss.cassandra.ecchronos.core.impl.table.MockTableReferenceFactory.tableReference; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class TestReplicationStateImpl +{ + @Mock + private NodeResolver mockNodeResolver; + + @Mock + private CqlSession mockSession; + + @Mock + private Metadata mockMetadata; + + @Mock + private TokenMap mockTokenMap; + + @Mock + private Node mockReplica1; + + @Mock + private Node mockReplica2; + + @Mock + private Node mockReplica3; + + @Mock + private Node mockReplica4; + + @Mock + private DriverNode mockNode1; + + @Mock + private DriverNode mockNode2; + + @Mock + private DriverNode mockNode3; + + @Mock + private DriverNode mockNode4; + + @Before + public void setup() throws Exception + { + InetAddress address1 = InetAddress.getByName("127.0.0.1"); + InetSocketAddress address11 = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9042); + InetAddress address2 = InetAddress.getByName("127.0.0.2"); + InetSocketAddress address22 = new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 9042); + InetAddress address3 = InetAddress.getByName("127.0.0.3"); + InetSocketAddress address33 = new InetSocketAddress(InetAddress.getByName("127.0.0.3"), 9042); + InetAddress address4 = InetAddress.getByName("127.0.0.4"); + InetSocketAddress address44 = new InetSocketAddress(InetAddress.getByName("127.0.0.4"), 9042); + + when(mockReplica1.getBroadcastAddress()).thenReturn(Optional.of(address11)); + when(mockReplica2.getBroadcastAddress()).thenReturn(Optional.of(address22)); + when(mockReplica3.getBroadcastAddress()).thenReturn(Optional.of(address33)); + when(mockReplica4.getBroadcastAddress()).thenReturn(Optional.of(address44)); + + when(mockNodeResolver.fromIp(eq(address1))).thenReturn(Optional.of(mockNode1)); + when(mockNodeResolver.fromIp(eq(address2))).thenReturn(Optional.of(mockNode2)); + when(mockNodeResolver.fromIp(eq(address3))).thenReturn(Optional.of(mockNode3)); + when(mockNodeResolver.fromIp(eq(address4))).thenReturn(Optional.of(mockNode4)); + + when(mockMetadata.getTokenMap()).thenReturn(Optional.of(mockTokenMap)); + when(mockSession.getMetadata()).thenReturn(mockMetadata); + } + + @Test + public void testGetTokenRangeToReplicaSingleToken() throws Exception + { + LongTokenRange range1 = new LongTokenRange(1, 2); + TableReference tableReference = tableReference("ks", "tb"); + + TokenRange tokenRange = TokenUtil.getRange(1, 2); + + doReturn(Sets.newHashSet(tokenRange)).when(mockTokenMap).getTokenRanges(eq("ks"), eq(mockReplica1)); + doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap) + .getReplicas(eq("ks"), eq(tokenRange)); + + ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession); + + Map> tokenRangeToReplicas = replicationState.getTokenRangeToReplicas( + tableReference, mockReplica1); + + assertThat(tokenRangeToReplicas.keySet()).containsExactlyInAnyOrder(range1); + assertThat(tokenRangeToReplicas.get(range1)).containsExactlyInAnyOrder(mockNode1, mockNode2, mockNode3); + + assertThat(replicationState.getNodes(tableReference, range1, mockReplica1)).isSameAs(tokenRangeToReplicas.get(range1)); + } + + @Test + public void testGetTokenRangeToReplica() throws Exception + { + LongTokenRange range1 = new LongTokenRange(1, 2); + LongTokenRange range2 = new LongTokenRange(2, 3); + TableReference tableReference = tableReference("ks", "tb"); + + TokenRange tokenRange1 = TokenUtil.getRange(1, 2); + TokenRange tokenRange2 = TokenUtil.getRange(2, 3); + + doReturn(Sets.newHashSet(tokenRange1, tokenRange2)).when(mockTokenMap) + .getTokenRanges(eq("ks"), eq(mockReplica1)); + doReturn(Sets.newHashSet(mockReplica1, mockReplica2)).when(mockTokenMap).getReplicas(eq("ks"), eq(tokenRange1)); + doReturn(Sets.newHashSet(mockReplica1, mockReplica3)).when(mockTokenMap).getReplicas(eq("ks"), eq(tokenRange2)); + + ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession); + + Map> tokenRangeToReplicas = replicationState.getTokenRangeToReplicas( + tableReference, mockReplica1); + + assertThat(tokenRangeToReplicas.keySet()).containsExactlyInAnyOrder(range1, range2); + assertThat(tokenRangeToReplicas.get(range1)).containsExactlyInAnyOrder(mockNode1, mockNode2); + assertThat(tokenRangeToReplicas.get(range2)).containsExactlyInAnyOrder(mockNode1, mockNode3); + + assertThat(replicationState.getNodes(tableReference, range1, mockReplica1)).isSameAs(tokenRangeToReplicas.get(range1)); + assertThat(replicationState.getNodes(tableReference, range2, mockReplica1)).isSameAs(tokenRangeToReplicas.get(range2)); + } + + @Test + public void testGetReplicas() throws Exception + { + TableReference tableReference = tableReference("ks", "tb"); + + TokenRange tokenRange1 = TokenUtil.getRange(1, 2); + TokenRange tokenRange2 = TokenUtil.getRange(2, 3); + + doReturn(Sets.newHashSet(tokenRange1, tokenRange2)).when(mockTokenMap) + .getTokenRanges(eq("ks"), eq(mockReplica1)); + doReturn(Sets.newHashSet(mockReplica1, mockReplica2)).when(mockTokenMap).getReplicas(eq("ks"), eq(tokenRange1)); + doReturn(Sets.newHashSet(mockReplica1, mockReplica3)).when(mockTokenMap).getReplicas(eq("ks"), eq(tokenRange2)); + + ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession); + + ImmutableSet replicas = replicationState.getReplicas(tableReference, mockReplica1); + + assertThat(replicas).containsExactlyInAnyOrder(mockNode1, mockNode2, mockNode3); + } + + @Test + public void testGetTokenRangeToReplicaSetReuse() throws Exception + { + LongTokenRange range1 = new LongTokenRange(1, 2); + LongTokenRange range2 = new LongTokenRange(2, 3); + TableReference tableReference = tableReference("ks", "tb"); + + TokenRange tokenRange1 = TokenUtil.getRange(1, 2); + TokenRange tokenRange2 = TokenUtil.getRange(2, 3); + + doReturn(Sets.newHashSet(tokenRange1, tokenRange2)).when(mockTokenMap) + .getTokenRanges(eq("ks"), eq(mockReplica1)); + doReturn(Sets.newHashSet(mockReplica1, mockReplica2)).when(mockTokenMap).getReplicas(eq("ks"), eq(tokenRange1)); + doReturn(Sets.newHashSet(mockReplica1, mockReplica2)).when(mockTokenMap).getReplicas(eq("ks"), eq(tokenRange2)); + + ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession); + + Map> tokenRangeToReplicas = replicationState.getTokenRangeToReplicas( + tableReference, mockReplica1); + + assertThat(tokenRangeToReplicas.keySet()).containsExactlyInAnyOrder(range1, range2); + assertThat(tokenRangeToReplicas.get(range1)).containsExactlyInAnyOrder(mockNode1, mockNode2); + assertThat(tokenRangeToReplicas.get(range1)).isSameAs(tokenRangeToReplicas.get(range2)); + + assertThat(replicationState.getNodes(tableReference, range1, mockReplica1)).isSameAs(tokenRangeToReplicas.get(range1)); + assertThat(replicationState.getNodes(tableReference, range2, mockReplica1)).isSameAs(tokenRangeToReplicas.get(range2)); + } + + @Test + public void testGetTokenRangeToReplicaMapReuse() throws Exception + { + LongTokenRange range1 = new LongTokenRange(1, 2); + TableReference tableReference = tableReference("ks", "tb"); + + TokenRange tokenRange = TokenUtil.getRange(1, 2); + + doReturn(Sets.newHashSet(tokenRange)).when(mockTokenMap).getTokenRanges(eq("ks"), eq(mockReplica1)); + doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap) + .getReplicas(eq("ks"), eq(tokenRange)); + + ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession); + + Map> tokenRangeToReplicas = replicationState.getTokenRangeToReplicas( + tableReference, mockReplica1); + + assertThat(tokenRangeToReplicas.keySet()).containsExactlyInAnyOrder(range1); + assertThat(tokenRangeToReplicas.get(range1)).containsExactlyInAnyOrder(mockNode1, mockNode2, mockNode3); + + assertThat(replicationState.getTokenRangeToReplicas(tableReference, mockReplica1)).isSameAs(tokenRangeToReplicas); + + assertThat(replicationState.getNodes(tableReference, range1, mockReplica1)).isSameAs(tokenRangeToReplicas.get(range1)); + } + + @Test + public void testGetNodesForSubRange() throws Exception + { + LongTokenRange subRange = new LongTokenRange(2, 3); + TableReference tableReference = tableReference("ks", "tb"); + + TokenRange tokenRange = TokenUtil.getRange(1, 5); + + doReturn(Sets.newHashSet(tokenRange)).when(mockTokenMap).getTokenRanges(eq("ks"), eq(mockReplica1)); + doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap) + .getReplicas(eq("ks"), eq(tokenRange)); + + ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession); + + ImmutableSet nodes = replicationState.getNodes(tableReference, subRange, mockReplica1); + + assertThat(nodes).containsExactlyInAnyOrder(mockNode1, mockNode2, mockNode3); + } + + @Test + public void testGetNodesForNonExistingSubRange() throws Exception + { + LongTokenRange subRange = new LongTokenRange(6, 7); + TableReference tableReference = tableReference("ks", "tb"); + + TokenRange tokenRange = TokenUtil.getRange(1, 5); + + doReturn(Sets.newHashSet(tokenRange)).when(mockTokenMap).getTokenRanges(eq("ks"), eq(mockReplica1)); + doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap) + .getReplicas(eq("ks"), eq(tokenRange)); + + ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession); + + assertThat(replicationState.getNodes(tableReference, subRange, mockReplica1)).isNull(); + } + + @Test + public void testGetNodesForIntersectingSubRange() throws Exception + { + LongTokenRange subRange = new LongTokenRange(4, 7); + TableReference tableReference = tableReference("ks", "tb"); + + TokenRange existingRange = TokenUtil.getRange(1, 5); + TokenRange existingRange2 = TokenUtil.getRange(5, 9); + + doReturn(Sets.newHashSet(existingRange, existingRange2)).when(mockTokenMap) + .getTokenRanges(eq("ks"), eq(mockReplica1)); + doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap) + .getReplicas(eq("ks"), eq(existingRange)); + doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap) + .getReplicas(eq("ks"), eq(existingRange2)); + + ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession); + + assertThat(replicationState.getNodes(tableReference, subRange, mockReplica1)).isNull(); + } + + @Test + public void testGetNodesClusterWideForSubRange() throws Exception + { + LongTokenRange subRange = new LongTokenRange(2, 3); + TableReference tableReference = tableReference("ks", "tb"); + + TokenRange tokenRange = TokenUtil.getRange(1, 5); + + doReturn(Sets.newHashSet(tokenRange)).when(mockTokenMap).getTokenRanges(); + doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap) + .getReplicas(eq("ks"), eq(tokenRange)); + + ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession); + + ImmutableSet nodes = replicationState.getNodesClusterWide(tableReference, subRange, mockReplica1); + + assertThat(nodes).containsExactlyInAnyOrder(mockNode1, mockNode2, mockNode3); + } + + @Test + public void testGetNodesClusterWideForNonExistingSubRange() throws Exception + { + LongTokenRange subRange = new LongTokenRange(6, 7); + TableReference tableReference = tableReference("ks", "tb"); + + TokenRange tokenRange = TokenUtil.getRange(1, 5); + + doReturn(Sets.newHashSet(tokenRange)).when(mockTokenMap).getTokenRanges(); + doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap) + .getReplicas(eq("ks"), eq(tokenRange)); + + ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession); + + assertThat(replicationState.getNodesClusterWide(tableReference, subRange, mockReplica1)).isNull(); + } + + @Test + public void testGetNodesClusterWideForIntersectingSubRange() throws Exception + { + LongTokenRange subRange = new LongTokenRange(4, 7); + TableReference tableReference = tableReference("ks", "tb"); + + TokenRange existingRange = TokenUtil.getRange(1, 5); + TokenRange existingRange2 = TokenUtil.getRange(5, 9); + + doReturn(Sets.newHashSet(existingRange, existingRange2)).when(mockTokenMap) + .getTokenRanges(); + doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap) + .getReplicas(eq("ks"), eq(existingRange)); + doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap) + .getReplicas(eq("ks"), eq(existingRange2)); + + ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession); + + assertThat(replicationState.getNodesClusterWide(tableReference, subRange, mockReplica1)).isNull(); + } + + @Test + public void testGetTokenRanges() throws Exception + { + LongTokenRange range1 = new LongTokenRange(1, 2); + LongTokenRange range2 = new LongTokenRange(2, 3); + LongTokenRange range3 = new LongTokenRange(3, 4); + LongTokenRange range4 = new LongTokenRange(4, 5); + TableReference tableReference = tableReference("ks", "tb"); + + TokenRange tokenRange1 = TokenUtil.getRange(1, 2); + TokenRange tokenRange2 = TokenUtil.getRange(2, 3); + TokenRange tokenRange3 = TokenUtil.getRange(3, 4); + TokenRange tokenRange4 = TokenUtil.getRange(4, 5); + + doReturn(Sets.newHashSet(tokenRange1, tokenRange2, tokenRange3, tokenRange4)).when(mockTokenMap) + .getTokenRanges(); + doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap) + .getReplicas(eq("ks"), eq(tokenRange1)); + doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap) + .getReplicas(eq("ks"), eq(tokenRange2)); + doReturn(Sets.newHashSet(mockReplica2, mockReplica3, mockReplica4)).when(mockTokenMap) + .getReplicas(eq("ks"), eq(tokenRange3)); + doReturn(Sets.newHashSet(mockReplica2, mockReplica3, mockReplica4)).when(mockTokenMap) + .getReplicas(eq("ks"), eq(tokenRange4)); + + ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession); + + Map> tokenRanges = replicationState.getTokenRanges(tableReference, mockReplica1); + + assertThat(tokenRanges.keySet()).containsExactlyInAnyOrder(range1, range2, range3, range4); + assertThat(tokenRanges.get(range1)).containsExactlyInAnyOrder(mockNode1, mockNode2, mockNode3); + assertThat(tokenRanges.get(range2)).containsExactlyInAnyOrder(mockNode1, mockNode2, mockNode3); + assertThat(tokenRanges.get(range3)).containsExactlyInAnyOrder(mockNode2, mockNode3, mockNode4); + assertThat(tokenRanges.get(range4)).containsExactlyInAnyOrder(mockNode2, mockNode3, mockNode4); + } + + @Test + public void testGetTokenRangesReuse() throws Exception + { + LongTokenRange range1 = new LongTokenRange(1, 2); + TableReference tableReference = tableReference("ks", "tb"); + + TokenRange tokenRange = TokenUtil.getRange(1, 2); + + doReturn(Sets.newHashSet(tokenRange)).when(mockTokenMap).getTokenRanges(); + doReturn(Sets.newHashSet(mockReplica1, mockReplica2, mockReplica3)).when(mockTokenMap) + .getReplicas(eq("ks"), eq(tokenRange)); + + ReplicationState replicationState = new ReplicationStateImpl(mockNodeResolver, mockSession); + + Map> tokenRanges = replicationState.getTokenRanges(tableReference, mockReplica1); + + assertThat(tokenRanges.keySet()).containsExactlyInAnyOrder(range1); + assertThat(tokenRanges.get(range1)).containsExactlyInAnyOrder(mockNode1, mockNode2, mockNode3); + + assertThat(replicationState.getTokenRanges(tableReference, mockReplica1)).isSameAs(tokenRanges); + } +} diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/TokenUtil.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/TokenUtil.java new file mode 100644 index 000000000..80ec56dd4 --- /dev/null +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/state/TokenUtil.java @@ -0,0 +1,29 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.state; + +import com.datastax.oss.driver.api.core.metadata.token.TokenRange; +import com.datastax.oss.driver.internal.core.metadata.token.Murmur3Token; +import com.datastax.oss.driver.internal.core.metadata.token.Murmur3TokenFactory; +import com.datastax.oss.driver.internal.core.metadata.token.TokenFactory; + +public class TokenUtil +{ + public static TokenRange getRange(long start, long end) throws Exception + { + TokenFactory tokenFactory = new Murmur3TokenFactory(); + return tokenFactory.range(new Murmur3Token(start), new Murmur3Token(end)); + } +} diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/MockTableReferenceFactory.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/MockTableReferenceFactory.java new file mode 100644 index 000000000..087b17852 --- /dev/null +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/MockTableReferenceFactory.java @@ -0,0 +1,199 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.table; + +import com.ericsson.bss.cassandra.ecchronos.core.exceptions.EcChronosException; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReferenceFactory; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; + +public class MockTableReferenceFactory implements TableReferenceFactory +{ + public static final int DEFAULT_GC_GRACE_SECONDS = 7200; + private static final ConcurrentMap tableReferences = new ConcurrentHashMap<>(); + + @Override + public TableReference forTable(String keyspace, String table) + { + return tableReference(keyspace, table); + } + + @Override + public TableReference forTable(TableMetadata table) + { + return tableReference(table); + } + + @Override + public Set forKeyspace(String keyspace) throws EcChronosException + { + Set tableReferences = new HashSet<>(); + tableReferences.add(tableReference(keyspace, "table1")); + tableReferences.add(tableReference(keyspace, "table2")); + tableReferences.add(tableReference(keyspace, "table3")); + return tableReferences; + } + + @Override + public Set forCluster() + { + Set tableReferences = new HashSet<>(); + tableReferences.add(tableReference("keyspace1", "table1")); + tableReferences.add(tableReference("keyspace1", "table2")); + tableReferences.add(tableReference("keyspace1", "table3")); + tableReferences.add(tableReference("keyspace2", "table4")); + tableReferences.add(tableReference("keyspace3", "table5")); + return tableReferences; + } + + public static TableReference tableReference(String keyspace, String table) + { + return tableReference(keyspace, table, DEFAULT_GC_GRACE_SECONDS); + } + + public static TableReference tableReference(String keyspace, String table, int gcGraceSeconds) + { + TableKey tableKey = new TableKey(keyspace, table); + TableReference tableReference = tableReferences.get(tableKey); + if (tableReference == null) + { + tableReference = tableReferences.computeIfAbsent(tableKey, + tb -> new MockTableReference(UUID.randomUUID(), keyspace, table, gcGraceSeconds)); + } + + return tableReference; + } + + public static TableReference tableReference(TableMetadata table) + { + return new MockTableReference(table); + } + + static class MockTableReference implements TableReference + { + private final UUID id; + private final String keyspace; + private final String table; + private final int gcGraceSeconds; + + MockTableReference(UUID id, String keyspace, String table) + { + this(id, keyspace, table, DEFAULT_GC_GRACE_SECONDS); + } + + MockTableReference(TableMetadata tableMetadata) + { + this(tableMetadata.getId().get(), tableMetadata.getKeyspace().asInternal(), + tableMetadata.getName().asInternal(), + (int) tableMetadata.getOptions().get(CqlIdentifier.fromInternal("gc_grace_seconds"))); + } + + MockTableReference(UUID id, String keyspace, String table, int gcGraceSeconds) + { + this.id = id; + this.keyspace = keyspace; + this.table = table; + this.gcGraceSeconds = gcGraceSeconds; + } + + @Override + public UUID getId() + { + return id; + } + + @Override + public String getTable() + { + return table; + } + + @Override + public String getKeyspace() + { + return keyspace; + } + + @Override + public int getGcGraceSeconds() + { + return gcGraceSeconds; + } + + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + MockTableReference that = (MockTableReference) o; + return id.equals(that.id) && + keyspace.equals(that.keyspace) && + table.equals(that.table); + } + + @Override + public int hashCode() + { + return Objects.hash(id, keyspace, table); + } + + @Override + public String toString() + { + return String.format("%s.%s (mock)", keyspace, table); + } + } + + static class TableKey + { + private final String keyspace; + private final String table; + + TableKey(String keyspace, String table) + { + this.keyspace = keyspace; + this.table = table; + } + + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + TableKey tableKey = (TableKey) o; + return keyspace.equals(tableKey.keyspace) && + table.equals(tableKey.table); + } + + @Override + public int hashCode() + { + return Objects.hash(keyspace, table); + } + } +} + diff --git a/core/pom.xml b/core/pom.xml new file mode 100644 index 000000000..89900e10f --- /dev/null +++ b/core/pom.xml @@ -0,0 +1,90 @@ + + + + 4.0.0 + + com.ericsson.bss.cassandra.ecchronos + agent + 1.0.0-SNAPSHOT + + + core + + + + + com.datastax.oss + java-driver-core + + + + com.datastax.oss + java-driver-query-builder + + + + com.google.guava + guava + + + + com.github.ben-manes.caffeine + caffeine + + + + + org.junit.vintage + junit-vintage-engine + test + + + + commons-io + commons-io + test + + + + org.awaitility + awaitility + test + + + + org.mockito + mockito-core + test + + + + org.assertj + assertj-core + test + + + + nl.jqno.equalsverifier + equalsverifier + test + + + + \ No newline at end of file diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/exceptions/EcChronosException.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/exceptions/EcChronosException.java new file mode 100644 index 000000000..1c468e839 --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/exceptions/EcChronosException.java @@ -0,0 +1,38 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.exceptions; + +/** + * Generic exception thrown by schedulers to signal that something went wrong. + */ +public class EcChronosException extends Exception +{ + private static final long serialVersionUID = 1148561336907867613L; + + public EcChronosException(final String message) + { + super(message); + } + + public EcChronosException(final Throwable t) + { + super(t); + } + + public EcChronosException(final String message, final Throwable t) + { + super(message, t); + } +} diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/exceptions/package-info.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/exceptions/package-info.java new file mode 100644 index 000000000..423bced6e --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/exceptions/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Contains custom ecChronos exceptions. + */ +package com.ericsson.bss.cassandra.ecchronos.core.exceptions; diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/DriverNode.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/DriverNode.java new file mode 100644 index 000000000..6399e7824 --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/DriverNode.java @@ -0,0 +1,96 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.metadata; + +import com.datastax.oss.driver.api.core.metadata.Node; + +import java.net.InetAddress; +import java.util.Objects; +import java.util.UUID; + +/** + * An internal representation of a node. + * This class together with {@link com.ericsson.bss.cassandra.ecchronos.core.metadata.NodeResolver} makes it easier to + * translate node IP to host ID and other way around. + */ +public class DriverNode +{ + private final Node node; + + public DriverNode(final Node aNode) + { + this.node = aNode; + } + + /** + * Get the host id of the node. + * + * @return The host id of the node. + */ + public UUID getId() + { + return node.getHostId(); + } + + /** + * Get the public ip address of the node. + * + * @return The public ip address of the node. + */ + public InetAddress getPublicAddress() + { + return node.getBroadcastAddress().get().getAddress(); + } + + /** + * Get the datacenter the node resides in. + * + * @return The datacenter of the node. + */ + public String getDatacenter() + { + return node.getDatacenter(); + } + + /** + * Check for equality. + */ + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + DriverNode that = (DriverNode) o; + return node.equals(that.node); + } + + @Override + public final int hashCode() + { + return Objects.hash(node); + } + + @Override + public final String toString() + { + return String.format("Node(%s:%s:%s)", getId(), getDatacenter(), getPublicAddress()); + } +} diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/Metadata.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/Metadata.java new file mode 100644 index 000000000..6dcc06425 --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/Metadata.java @@ -0,0 +1,56 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.metadata; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; +import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; +import com.datastax.oss.driver.internal.core.util.Strings; + +import java.util.Optional; + +/** + * Helper class to retrieve keyspace and table metadata, + * this should be preferred than doing session.getMetadata().getKeyspace(name) or keyspaceMetadata.getTable(name) + * Main purpose is to not have to care if the keyspace/table string representation is quoted or not. + * In driver, keyspaces/tables with camelCase needs to be quoted. + */ +public final class Metadata +{ + private Metadata() + { + //Intentionally left empty + } + + public static Optional getKeyspace(final CqlSession session, final String keyspace) + { + String keyspaceName = quoteIfNeeded(keyspace); + return session.getMetadata().getKeyspace(keyspaceName); + } + + public static Optional getTable(final KeyspaceMetadata keyspaceMetadata, final String table) + { + String tableName = quoteIfNeeded(table); + return keyspaceMetadata.getTable(tableName); + } + + public static String quoteIfNeeded(final String keyspaceOrTable) + { + return Strings.needsDoubleQuotes(keyspaceOrTable) && !Strings.isDoubleQuoted(keyspaceOrTable) + ? Strings.doubleQuote(keyspaceOrTable) + : keyspaceOrTable; + } +} + diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/NodeResolver.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/NodeResolver.java new file mode 100644 index 000000000..0d7d8b9c6 --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/NodeResolver.java @@ -0,0 +1,35 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.metadata; + +import java.net.InetAddress; +import java.util.Optional; +import java.util.UUID; + +/** + * Node resolver interface. + */ +public interface NodeResolver +{ + /** + * Retrieve a node based on public ip address. + * + * @param inetAddress The public ip address of the node instance. + * @return The node. + */ + Optional fromIp(InetAddress inetAddress); + + Optional fromUUID(UUID nodeId); +} diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/package-info.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/package-info.java new file mode 100644 index 000000000..173afadf7 --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Contains the Interfaces and resources for mapping node metadata. + */ +package com.ericsson.bss.cassandra.ecchronos.core.metadata; diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/state/LongTokenRange.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/state/LongTokenRange.java new file mode 100644 index 000000000..26bdb2caa --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/state/LongTokenRange.java @@ -0,0 +1,142 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.state; + +import java.math.BigInteger; + +/** + * A representation of a token range in Cassandra. + */ +@SuppressWarnings("VisibilityModifier") +public class LongTokenRange +{ + private static final int HASH_THIRTYONE = 31; + private static final int HASH_THIRTYTWO = 32; + + private static final int LONG_VALUE_BITS = 64; + + public static final BigInteger RANGE_END = + BigInteger.valueOf(2).pow(LONG_VALUE_BITS - 1).subtract(BigInteger.ONE); // Long.MAX_VALUE + public static final BigInteger FULL_RANGE = + BigInteger.valueOf(2).pow(LONG_VALUE_BITS); + + public final long start; + public final long end; + + public LongTokenRange(final long aStart, final long anEnd) + { + this.start = aStart; + this.end = anEnd; + } + + /** + * Check if the token range is wrapping around. + * + * @return True in case this token range wraps around. + */ + public boolean isWrapAround() + { + return start >= end; + } + + /** + * Calculate the size of the token range. + * + * @return The size of the token range. + */ + public BigInteger rangeSize() + { + BigInteger tokenStart = BigInteger.valueOf(start); + BigInteger tokenEnd = BigInteger.valueOf(end); + + BigInteger rangeSize = tokenEnd.subtract(tokenStart); + + if (rangeSize.compareTo(BigInteger.ZERO) <= 0) + { + rangeSize = rangeSize.add(FULL_RANGE); + } + + return rangeSize; + } + + /** + * Check if this range covers the other range. + *

+ * The range (I, J] covers (K, L] if: + *
+ * I <= K and J >= L if either both are wrapping or not wrapping. + *
+ * I <= K or J >= L if this is wrapping. + * + * @param other The token range to check if this is covering. + * @return True if this token range covers the provided token range. + */ + public boolean isCovering(final LongTokenRange other) + { + boolean thisWraps = isWrapAround(); + boolean otherWraps = other.isWrapAround(); + + if (thisWraps == otherWraps) + { + // Normal case - are we including the other range + return start <= other.start && end >= other.end; + } + else if (thisWraps) + { + // If only this wraps we cover it if either: + // start is before the other start + // end is after the other end + return this.start <= other.start || this.end >= other.end; + } + + // If the other wraps but we don't we can't possibly cover it + return false; + } + + @Override + public final String toString() + { + return String.format("(%s,%s]", start, end); + } + + @Override + public final boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + LongTokenRange that = (LongTokenRange) o; + + if (start != that.start) + { + return false; + } + return end == that.end; + } + + @Override + public final int hashCode() + { + int result = (int) (start ^ (start >>> HASH_THIRTYTWO)); + result = HASH_THIRTYONE * result + (int) (end ^ (end >>> HASH_THIRTYTWO)); + return result; + } +} diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/state/ReplicationState.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/state/ReplicationState.java new file mode 100644 index 000000000..019596e73 --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/state/ReplicationState.java @@ -0,0 +1,69 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.state; + +import com.datastax.oss.driver.api.core.metadata.Node; +import com.ericsson.bss.cassandra.ecchronos.core.metadata.DriverNode; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import java.util.Map; + +import com.google.common.collect.ImmutableSet; + +/** + * Replication state interface used to retrieve mappings between token range to responsible nodes. + * + * Within a keyspace the methods are expected to return the exact same object instance for a set of nodes. + */ +public interface ReplicationState +{ + /** + * Get the nodes that are responsible for the provided token range. + * The provided token range can be a sub range of an existing one. + * + * @param tableReference The table used to calculate the proper replication. + * @param tokenRange The token range to get nodes for. + * @return The responsible nodes or null if either the token range does not exist or is intersecting two ranges. + */ + ImmutableSet getNodes(TableReference tableReference, LongTokenRange tokenRange, Node currentNode); + + /** + * Get the nodes that are a replica for the provided table that have ranges in common with the local node. + * + * @param tableReference The table to fetch replicas for. + * @return The replicas for the table + */ + ImmutableSet getReplicas(TableReference tableReference, Node currentNode); + + /** + * Get the nodes that are responsible for the provided token range, check clusterwide. + * The provided token range can be a sub range of an existing one. + * + * @param tableReference The table used to calculate the proper replication. + * @param tokenRange The token range to get nodes for. + * @return The responsible nodes or null if either the token range does not exist or is intersecting two ranges. + */ + ImmutableSet getNodesClusterWide(TableReference tableReference, LongTokenRange tokenRange, Node currentNode); + + /** + * Get a map of the current replication state for the provided table. + * + * @param tableReference + * The table used to calculate the proper replication. + * @return The map consisting of token -> responsible nodes. + */ + Map> getTokenRangeToReplicas(TableReference tableReference, Node currentNode); + + Map> getTokenRanges(TableReference tableReference, Node currentNode); +} diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/state/package-info.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/state/package-info.java new file mode 100644 index 000000000..34d4750dd --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/state/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Contains the interfaces and resources for stateful declarations. + */ +package com.ericsson.bss.cassandra.ecchronos.core.state; diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/table/TableReference.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/table/TableReference.java new file mode 100644 index 000000000..bd90ab7f2 --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/table/TableReference.java @@ -0,0 +1,31 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.table; + +import java.util.UUID; + +/** + * An interface containing keyspace/table mapping to avoid passing around two strings to refer to one specific table. + */ +public interface TableReference +{ + UUID getId(); + + String getTable(); + + String getKeyspace(); + + int getGcGraceSeconds(); +} diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/table/TableReferenceFactory.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/table/TableReferenceFactory.java new file mode 100644 index 000000000..8c9f08750 --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/table/TableReferenceFactory.java @@ -0,0 +1,58 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.table; + +import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; +import com.ericsson.bss.cassandra.ecchronos.core.exceptions.EcChronosException; +import java.util.Set; + +/** + * A factory that generates table references. + */ +public interface TableReferenceFactory +{ + /** + * Get a table reference for the provided keyspace/table pair. + * + * @param keyspace The keyspace name. + * @param table The table name. + * @return A table reference for the provided keyspace/table pair or null if table does not exist. + */ + TableReference forTable(String keyspace, String table); + + /** + * Get a table reference for the provided TableMetadata. + * + * @param table the TableMetadata. + * @return A table reference for the provided keyspace/table pair. + */ + TableReference forTable(TableMetadata table); + + /** + * Get all table references in keyspace. + * + * @param keyspace The keyspace name + * @throws com.ericsson.bss.cassandra.ecchronos.core.exceptions.EcChronosException if keyspace does not exist. + * @return A unique set of all table references for a specific keyspace. + */ + Set forKeyspace(String keyspace) throws EcChronosException; + + /** + * Get all table references for a cluster (all keyspaces, all tables). + * + * @return A unique set of all table references for the cluster. + */ + Set forCluster(); +} diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/table/package-info.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/table/package-info.java new file mode 100644 index 000000000..7d66e62ed --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/table/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Contains the interfaces and resources for tables and keyspace. + */ +package com.ericsson.bss.cassandra.ecchronos.core.table; diff --git a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/TestMetadata.java b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/TestMetadata.java new file mode 100644 index 000000000..a3992ba92 --- /dev/null +++ b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/metadata/TestMetadata.java @@ -0,0 +1,100 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.metadata; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; +import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.Silent.class) +public class TestMetadata +{ + @Mock + CqlSession cqlSessionMock; + + @Mock + com.datastax.oss.driver.api.core.metadata.Metadata metadataMock; + + @Mock + KeyspaceMetadata keyspaceMetadataMock; + + @Mock + TableMetadata tableMetadataMock; + + @Before + public void setup() + { + when(cqlSessionMock.getMetadata()).thenReturn(metadataMock); + } + + @Test + public void testGetKeyspace() + { + String keyspace = "keyspace1"; + when(metadataMock.getKeyspace(eq(keyspace))).thenReturn(Optional.of(keyspaceMetadataMock)); + assertThat(Metadata.getKeyspace(cqlSessionMock, keyspace)).isNotEmpty(); + } + + @Test + public void testGetKeyspaceWithCamelCase() + { + String keyspace = "keyspace1WithCamelCase"; + when(metadataMock.getKeyspace(eq("\""+keyspace+"\""))).thenReturn(Optional.of(keyspaceMetadataMock)); + assertThat(Metadata.getKeyspace(cqlSessionMock, keyspace)).isNotEmpty(); + } + + @Test + public void testGetKeyspaceWithCamelCaseAlreadyQuoted() + { + String keyspace = "keyspace1WithCamelCase"; + when(metadataMock.getKeyspace(eq("\""+keyspace+"\""))).thenReturn(Optional.of(keyspaceMetadataMock)); + assertThat(Metadata.getKeyspace(cqlSessionMock, "\""+keyspace+"\"")).isNotEmpty(); + } + + @Test + public void testGetTable() + { + String table = "table1"; + when(keyspaceMetadataMock.getTable(eq(table))).thenReturn(Optional.of(tableMetadataMock)); + assertThat(Metadata.getTable(keyspaceMetadataMock, table)).isNotEmpty(); + } + + @Test + public void testGetTableWithCamelCase() + { + String table = "table1WithCamelCase"; + when(keyspaceMetadataMock.getTable(eq("\""+table+"\""))).thenReturn(Optional.of(tableMetadataMock)); + assertThat(Metadata.getTable(keyspaceMetadataMock, table)).isNotEmpty(); + } + + @Test + public void testGetTableWithCamelCaseAlreadyQuoted() + { + String table = "table1WithCamelCase"; + when(keyspaceMetadataMock.getTable(eq("\""+table+"\""))).thenReturn(Optional.of(tableMetadataMock)); + assertThat(Metadata.getTable(keyspaceMetadataMock, "\""+table+"\"")).isNotEmpty(); + } +} diff --git a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/state/TestLongTokenRange.java b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/state/TestLongTokenRange.java new file mode 100644 index 000000000..ff706ec5a --- /dev/null +++ b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/state/TestLongTokenRange.java @@ -0,0 +1,194 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.state; + +import static org.assertj.core.api.Assertions.assertThat; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.Test; + +import java.math.BigInteger; + +public class TestLongTokenRange +{ + private static final BigInteger FULL_RANGE = BigInteger.valueOf(2).pow(64); + + @Test + public void testRangesEqual() + { + LongTokenRange range1 = new LongTokenRange(1, 2); + LongTokenRange range2 = new LongTokenRange(1, 2); + + assertThat(range1).isEqualTo(range2); + assertThat(range1.hashCode()).isEqualTo(range2.hashCode()); + } + + @Test + public void testRangesNotEqual() + { + LongTokenRange range1 = new LongTokenRange(1, 2); + LongTokenRange range2 = new LongTokenRange(2, 3); + + assertThat(range1).isNotEqualTo(range2); + } + + @Test + public void testIsWrapAroundNonWrapping() + { + assertThat(new LongTokenRange(1, 2).isWrapAround()).isFalse(); + } + + @Test + public void testIsWrapAroundWrapping() + { + assertThat(new LongTokenRange(2, 1).isWrapAround()).isTrue(); + } + + @Test + public void testIsWrapAroundFullRange() + { + assertThat(new LongTokenRange(Long.MIN_VALUE, Long.MIN_VALUE).isWrapAround()).isTrue(); + } + + @Test + public void testRangeSizeFullRange() + { + LongTokenRange tokenRange = new LongTokenRange(Long.MIN_VALUE, Long.MIN_VALUE); + + assertThat(tokenRange.rangeSize()).isEqualTo(FULL_RANGE); + } + + @Test + public void testRangeSizePositive() + { + LongTokenRange tokenRange = new LongTokenRange(10, 123456789); + BigInteger expectedRangeSize = BigInteger.valueOf(123456779); + + assertThat(tokenRange.rangeSize()).isEqualTo(expectedRangeSize); + } + + @Test + public void testRangeSizeNegative() + { + LongTokenRange tokenRange = new LongTokenRange(-123456789, -10); + BigInteger expectedRangeSize = BigInteger.valueOf(123456779); + + assertThat(tokenRange.rangeSize()).isEqualTo(expectedRangeSize); + } + + @Test + public void testRangeSizeNegativeToPositive() + { + LongTokenRange tokenRange = new LongTokenRange(-500, 1500); + BigInteger expectedRangeSize = BigInteger.valueOf(2000); + + assertThat(tokenRange.rangeSize()).isEqualTo(expectedRangeSize); + } + + @Test + public void testRangeSizeWrapAround() + { + LongTokenRange tokenRange = new LongTokenRange(10, -10); + BigInteger expectedRangeSize = FULL_RANGE.subtract(BigInteger.valueOf(20)); + + assertThat(tokenRange.rangeSize()).isEqualTo(expectedRangeSize); + } + + @Test + public void testNotCovering() + { + LongTokenRange range1 = new LongTokenRange(50, 500); + LongTokenRange range2 = new LongTokenRange(10, 40); + + assertThat(range1.isCovering(range2)).isFalse(); + } + + @Test + public void testNotCoveringNegative() + { + LongTokenRange range1 = new LongTokenRange(-500, -50); + LongTokenRange range2 = new LongTokenRange(-40, -10); + + assertThat(range1.isCovering(range2)).isFalse(); + } + + @Test + public void testNotCoveringOneWrapping() + { + LongTokenRange range1 = new LongTokenRange(50, -50); + LongTokenRange range2 = new LongTokenRange(10, 40); + + assertThat(range1.isCovering(range2)).isFalse(); + } + + @Test + public void testNotCoveringBothWrapping() + { + LongTokenRange range1 = new LongTokenRange(50, -50); + LongTokenRange range2 = new LongTokenRange(10, -40); + + assertThat(range1.isCovering(range2)).isFalse(); + } + + @Test + public void testCoveringSubRanges() + { + LongTokenRange range1 = new LongTokenRange(50, 500); + LongTokenRange range2 = new LongTokenRange(50, 200); + LongTokenRange range3 = new LongTokenRange(200, 350); + LongTokenRange range4 = new LongTokenRange(350, 500); + + assertThat(range1.isCovering(range2)).isTrue(); + assertThat(range1.isCovering(range3)).isTrue(); + assertThat(range1.isCovering(range4)).isTrue(); + } + + @Test + public void testCoveringSubRangesNegative() + { + LongTokenRange range1 = new LongTokenRange(-500, -50); + LongTokenRange range2 = new LongTokenRange(-500, -350); + LongTokenRange range3 = new LongTokenRange(-350, -200); + LongTokenRange range4 = new LongTokenRange(-200, -50); + + assertThat(range1.isCovering(range2)).isTrue(); + assertThat(range1.isCovering(range3)).isTrue(); + assertThat(range1.isCovering(range4)).isTrue(); + } + + @Test + public void testCoveringOneWrapping() + { + LongTokenRange range1 = new LongTokenRange(50, -50); + LongTokenRange range2 = new LongTokenRange(60, 70); + + assertThat(range1.isCovering(range2)).isTrue(); + } + + @Test + public void testCoveringBothWrapping() + { + LongTokenRange range1 = new LongTokenRange(50, -50); + LongTokenRange range2 = new LongTokenRange(60, -60); + + assertThat(range1.isCovering(range2)).isTrue(); + } + + @Test + public void testEqualsContract() + { + EqualsVerifier.forClass(LongTokenRange.class).usingGetClass().verify(); + } +} diff --git a/pom.xml b/pom.xml index 62a10baaa..df2ac8dcb 100644 --- a/pom.xml +++ b/pom.xml @@ -74,6 +74,8 @@ connection.impl application data + core + core.impl