Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-16871: Race condition in CoordinatorHttpSolrCall synthetic collection/replica init #1762

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.solr.api.CoordinatorV2HttpSolrCall;
Expand Down Expand Up @@ -87,7 +88,7 @@ public static SolrCore getCore(
SolrCore core = null;
if (coll != null) {
String confName = coll.getConfigName();
String syntheticCollectionName = SYNTHETIC_COLL_PREFIX + confName;
String syntheticCollectionName = getSyntheticCollectionName(confName);

DocCollection syntheticColl = clusterState.getCollectionOrNull(syntheticCollectionName);
if (syntheticColl == null) {
Expand All @@ -96,9 +97,32 @@ public static SolrCore getCore(
log.info(
"synthetic collection: {} does not exist, creating.. ", syntheticCollectionName);
}
createColl(syntheticCollectionName, solrCall.cores, confName);
syntheticColl =
zkStateReader.getClusterState().getCollectionOrNull(syntheticCollectionName);

SolrException createException = null;
try {
createColl(syntheticCollectionName, solrCall.cores, confName);
} catch (SolrException exception) {
// concurrent requests could have created the collection hence causing collection exists
// exception
createException = exception;
} finally {
syntheticColl =
zkStateReader.getClusterState().getCollectionOrNull(syntheticCollectionName);
}

// then indeed the collection was not created properly, either by this or other concurrent
// requests
if (syntheticColl == null) {
if (createException != null) {
throw createException; // rethrow the exception since such collection was not created
} else {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR,
"Could not locate synthetic collection ["
+ syntheticCollectionName
+ "] after creation!");
}
}
}
List<Replica> nodeNameSyntheticReplicas =
syntheticColl.getReplicas(solrCall.cores.getZkController().getNodeName());
Expand All @@ -112,6 +136,32 @@ public static SolrCore getCore(

addReplica(syntheticCollectionName, solrCall.cores);
}
// still have to ensure that it's active, otherwise super.getCoreByCollection
// will return null and then CoordinatorHttpSolrCall will call getCore again
// hence creating a calling loop
try {
zkStateReader.waitForState(
syntheticCollectionName,
10,
TimeUnit.SECONDS,
docCollection -> {
for (Replica nodeNameSyntheticReplica :
docCollection.getReplicas(solrCall.cores.getZkController().getNodeName())) {
noblepaul marked this conversation as resolved.
Show resolved Hide resolved
if (nodeNameSyntheticReplica.getState() == Replica.State.ACTIVE) {
return true;
}
}
return false;
});
} catch (Exception e) {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR,
"Failed to wait for active replica for synthetic collection ["
+ syntheticCollectionName
+ "]",
e);
}

core = solrCall.getCoreByCollection(syntheticCollectionName, isPreferLeader);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7% of developers fix this issue

THREAD_SAFETY_VIOLATION: Unprotected write. Non-private method CoordinatorHttpSolrCall.getCore(...) indirectly mutates container core.SolrResourceLoader.classNameCache via call to Map.put(...) outside of synchronization.
Reporting because a superclass class org.apache.solr.servlet.HttpSolrCall is annotated @ThreadSafe, so we assume that this method can run in parallel with other non-private methods in the class (including itself).


ℹ️ Expand to see all @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.

if (core != null) {
factory.collectionVsCoreNameMapping.put(collectionName, core.getName());
Expand Down Expand Up @@ -142,6 +192,10 @@ public static SolrCore getCore(
}
}

public static String getSyntheticCollectionName(String configName) {
return SYNTHETIC_COLL_PREFIX + configName;
}

/**
* Overrides the MDC context as the core set was synthetic core, which does not reflect the
* collection being operated on
Expand All @@ -158,9 +212,13 @@ private static void setMDCLoggingContext(String collectionName) {
private static void addReplica(String syntheticCollectionName, CoreContainer cores) {
SolrQueryResponse rsp = new SolrQueryResponse();
try {
String coreName =
syntheticCollectionName + "_" + cores.getZkController().getNodeName().replace(':', '_');
CollectionAdminRequest.AddReplica addReplicaRequest =
CollectionAdminRequest.addReplicaToShard(syntheticCollectionName, "shard1")
.setCreateNodeSet(cores.getZkController().getNodeName());
// we are fixing the name, so that no two replicas are created in the same node
.setCoreName(coreName)
.setNode(cores.getZkController().getNodeName());
addReplicaRequest.setWaitForFinalState(true);
cores
.getCollectionsHandler()
Expand All @@ -171,6 +229,15 @@ private static void addReplica(String syntheticCollectionName, CoreContainer cor
"Could not auto-create collection: " + Utils.toJSONString(rsp.getValues()));
}
} catch (SolrException e) {
if (e.getMessage().contains("replica with the same core name already exists")) {
// another request has already created a replica for this synthetic collection
if (log.isInfoEnabled()) {
log.info(
"A replica is already created in this node for synthetic collection: {}",
syntheticCollectionName);
}
return;
}
throw e;

} catch (Exception e) {
Expand Down
108 changes: 106 additions & 2 deletions solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.solr.common.params.CommonParams.TRUE;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashSet;
Expand All @@ -33,6 +34,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
Expand All @@ -43,6 +45,7 @@
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
Expand All @@ -69,7 +72,7 @@ public void testSimple() throws Exception {
try {
CloudSolrClient client = cluster.getSolrClient();
String COLLECTION_NAME = "test_coll";
String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.getSyntheticCollectionName("conf");
CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
Expand Down Expand Up @@ -117,7 +120,7 @@ public void testMultiCollectionMultiNode() throws Exception {
try {
CloudSolrClient client = cluster.getSolrClient();
String COLLECTION_NAME = "test_coll";
String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.getSyntheticCollectionName("conf");
for (int j = 1; j <= 10; j++) {
String collname = COLLECTION_NAME + "_" + j;
CollectionAdminRequest.createCollection(collname, "conf", 2, 2)
Expand Down Expand Up @@ -481,6 +484,107 @@ private String getHostCoreName(String COLL, String qaNode, Consumer<SolrQuery> p
return (String) docs.get(0).getFieldValue("_core_");
}

public void testConcurrentAccess() throws Exception {
final int DATA_NODE_COUNT = 2;
final int COORDINATOR_NODE_COUNT = 4;
MiniSolrCloudCluster cluster =
configureCluster(DATA_NODE_COUNT).addConfig("conf", configset("cloud-minimal")).configure();

List<String> dataNodes =
cluster.getJettySolrRunners().stream()
.map(JettySolrRunner::getNodeName)
.collect(Collectors.toUnmodifiableList());

try {
CloudSolrClient client = cluster.getSolrClient();
String COLLECTION_PREFIX = "test_coll_";

final int COLLECTION_COUNT = 10;
final int DOC_PER_COLLECTION_COUNT = 1000;

List<String> collectionNames = new ArrayList<>();
for (int i = 0; i < COLLECTION_COUNT; i++) {
String collectionName = COLLECTION_PREFIX + i;
CollectionAdminRequest.createCollection(collectionName, "conf", 2, 1)
.setCreateNodeSet(String.join(",", dataNodes)) // only put data onto the 2 data nodes
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(collectionName, 2, 2);
collectionNames.add(collectionName);
}

for (String collectionName : collectionNames) {
UpdateRequest ur = new UpdateRequest();
for (int i = 0; i < DOC_PER_COLLECTION_COUNT; i++) {
SolrInputDocument doc2 = new SolrInputDocument();
doc2.addField("id", collectionName + "-" + i);
ur.add(doc2);
}
ur.commit(client, collectionName);
QueryResponse rsp = client.query(collectionName, new SolrQuery("*:*"));
assertEquals(DOC_PER_COLLECTION_COUNT, rsp.getResults().getNumFound());
}

System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
List<String> coordinatorNodes = new ArrayList<>();
try {
for (int i = 0; i < COORDINATOR_NODE_COUNT; i++) {
JettySolrRunner coordinatorJetty = cluster.startJettySolrRunner();
coordinatorNodes.add(coordinatorJetty.getNodeName());
}
} finally {
System.clearProperty(NodeRoles.NODE_ROLES_PROP);
}

int THREAD_COUNT = 10;
int RUN_COUNT = 20;
// final AtomicInteger runCounter = new AtomicInteger();
// 10 threads to concurrently access the collections and ensure data are not mixed up
ExecutorService executorService =
ExecutorUtil.newMDCAwareFixedThreadPool(
THREAD_COUNT, new SolrNamedThreadFactory(this.getClass().getSimpleName()));
List<Future<?>> testFutures = new ArrayList<>();

for (int i = 0; i < RUN_COUNT; i++) {
final int currentRun = i;
testFutures.add(
executorService.submit(
() -> {
final String collectionName =
collectionNames.get(currentRun % collectionNames.size());
final String coordinatorNode =
coordinatorNodes.get(currentRun % coordinatorNodes.size());
QueryResponse response =
new QueryRequest(new SolrQuery("*:*"))
.setPreferredNodes(List.of(coordinatorNode))
.process(client, collectionName);
assertEquals(DOC_PER_COLLECTION_COUNT, response.getResults().getNumFound());
// ensure docs have the correct id (ie not mixing up with other collections)
for (SolrDocument doc : response.getResults()) {
assertTrue(((String) doc.getFieldValue("id")).startsWith(collectionName));
}
return null;
}));
}
for (Future<?> testFuture : testFutures) {
testFuture.get(); // check for any exceptions/failures
}

// number of replicas created in the synthetic collection should be one per coordinator node
assertEquals(
COORDINATOR_NODE_COUNT,
client
.getClusterState()
.getCollection(CoordinatorHttpSolrCall.getSyntheticCollectionName("conf"))
.getReplicas()
.size());

executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
} finally {
cluster.shutdown();
}
}

public void testWatch() throws Exception {
final int DATA_NODE_COUNT = 2;
MiniSolrCloudCluster cluster =
Expand Down
Loading