-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Updated RFS to filter out system indices by default #763
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
package com.rfs.common; | ||
|
||
import java.util.List; | ||
import java.util.function.BiConsumer; | ||
import java.util.function.Predicate; | ||
|
||
public class FilterScheme { | ||
|
||
public static Predicate<SnapshotRepo.Index> filterIndicesByAllowList(List<String> indexAllowlist, BiConsumer<String, Boolean> indexNameAcceptanceObserver) { | ||
return index -> { | ||
boolean accepted; | ||
if (indexAllowlist.isEmpty()) { | ||
accepted = !index.getName().startsWith("."); | ||
} else { | ||
accepted = indexAllowlist.contains(index.getName()); | ||
} | ||
|
||
indexNameAcceptanceObserver.accept(index.getName(), accepted); | ||
|
||
return accepted; | ||
}; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,13 @@ | ||
package com.rfs.worker; | ||
|
||
import java.util.Optional; | ||
import java.util.List; | ||
import java.util.function.BiConsumer; | ||
|
||
import com.fasterxml.jackson.databind.node.ObjectNode; | ||
import com.rfs.common.SnapshotRepo; | ||
import lombok.AllArgsConstructor; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
import com.rfs.common.FilterScheme; | ||
import com.rfs.common.IndexMetadata; | ||
import com.rfs.transformers.Transformer; | ||
import com.rfs.version_os_2_11.IndexCreator_OS_2_11; | ||
|
@@ -19,18 +20,28 @@ | |
private final IndexMetadata.Factory metadataFactory; | ||
private final IndexCreator_OS_2_11 indexCreator; | ||
private final Transformer transformer; | ||
private final List<String> indexAllowlist; | ||
|
||
public void migrateIndices() { | ||
SnapshotRepo.Provider repoDataProvider = metadataFactory.getRepoDataProvider(); | ||
// TODO - parallelize this, maybe ~400-1K requests per thread and do it asynchronously | ||
for (SnapshotRepo.Index index : repoDataProvider.getIndicesInSnapshot(snapshotName)) { | ||
var indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName()); | ||
var root = indexMetadata.toObjectNode(); | ||
var transformedRoot = transformer.transformIndexMetadata(root); | ||
var resultOp = indexCreator.create(transformedRoot, index.getName(), indexMetadata.getId()); | ||
resultOp.ifPresentOrElse(value -> log.info("Index " + index.getName() + " created successfully"), | ||
() -> log.info("Index " + index.getName() + " already existed; no work required") | ||
); | ||
} | ||
|
||
BiConsumer<String, Boolean> logger = (indexName, accepted) -> { | ||
if (!accepted) { | ||
log.info("Index " + indexName + " rejected by allowlist"); | ||
} | ||
}; | ||
repoDataProvider.getIndicesInSnapshot(snapshotName).stream() | ||
.filter(FilterScheme.filterIndicesByAllowList(indexAllowlist, logger)) | ||
.peek(index -> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: we should be able to do a |
||
var indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName()); | ||
var root = indexMetadata.toObjectNode(); | ||
var transformedRoot = transformer.transformIndexMetadata(root); | ||
var resultOp = indexCreator.create(transformedRoot, index.getName(), indexMetadata.getId()); | ||
resultOp.ifPresentOrElse(value -> log.info("Index " + index.getName() + " created successfully"), | ||
() -> log.info("Index " + index.getName() + " already existed; no work required") | ||
); | ||
}) | ||
.count(); // Force the stream to execute | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. foreach() will force the stream to run. I think that this will generate a linting error as it is since the output of count() is discarded. |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
|
||
import com.rfs.cms.IWorkCoordinator; | ||
import com.rfs.cms.ScopedWorkCoordinator; | ||
import com.rfs.common.FilterScheme; | ||
import com.rfs.common.IndexMetadata; | ||
import com.rfs.common.SnapshotRepo; | ||
import lombok.Lombok; | ||
|
@@ -10,6 +11,9 @@ | |
|
||
import java.io.IOException; | ||
import java.time.Duration; | ||
import java.util.List; | ||
import java.util.function.BiConsumer; | ||
import java.util.stream.IntStream; | ||
|
||
/** | ||
* This class adds workitemes (leasable mutexes) via the WorkCoordinator so that future | ||
|
@@ -22,7 +26,7 @@ | |
public static final String SHARD_SETUP_WORK_ITEM_ID = "shard_setup"; | ||
|
||
public void run(ScopedWorkCoordinator scopedWorkCoordinator, IndexMetadata.Factory metadataFactory, | ||
String snapshotName) | ||
String snapshotName, List<String> indexAllowlist) | ||
throws IOException, InterruptedException { | ||
|
||
// ensure that there IS an index to house the shared state that we're going to be manipulating | ||
|
@@ -44,7 +48,7 @@ | |
|
||
@Override | ||
public Void onAcquiredWork(IWorkCoordinator.WorkItemAndDuration workItem) throws IOException { | ||
prepareShardWorkItems(scopedWorkCoordinator.workCoordinator, metadataFactory, snapshotName); | ||
prepareShardWorkItems(scopedWorkCoordinator.workCoordinator, metadataFactory, snapshotName, indexAllowlist); | ||
return null; | ||
} | ||
|
||
|
@@ -56,18 +60,32 @@ | |
} | ||
|
||
@SneakyThrows | ||
private static void prepareShardWorkItems(IWorkCoordinator workCoordinator, | ||
IndexMetadata.Factory metadataFactory, String snapshotName) { | ||
private static void prepareShardWorkItems(IWorkCoordinator workCoordinator, IndexMetadata.Factory metadataFactory, | ||
String snapshotName, List<String> indexAllowlist) { | ||
log.info("Setting up the Documents Work Items..."); | ||
SnapshotRepo.Provider repoDataProvider = metadataFactory.getRepoDataProvider(); | ||
for (SnapshotRepo.Index index : repoDataProvider.getIndicesInSnapshot(snapshotName)) { | ||
IndexMetadata.Data indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName()); | ||
log.info("Index " + indexMetadata.getName() + " has " + indexMetadata.getNumberOfShards() + " shards"); | ||
for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) { | ||
log.info("Creating Documents Work Item for index: " + indexMetadata.getName() + ", shard: " + shardId); | ||
workCoordinator.createUnassignedWorkItem(IndexAndShard.formatAsWorkItemString(indexMetadata.getName(), shardId)); | ||
|
||
BiConsumer<String, Boolean> logger = (indexName, accepted) -> { | ||
if (!accepted) { | ||
log.info("Index " + indexName + " rejected by allowlist"); | ||
} | ||
} | ||
}; | ||
repoDataProvider.getIndicesInSnapshot(snapshotName).stream() | ||
.filter(FilterScheme.filterIndicesByAllowList(indexAllowlist, logger)) | ||
.peek(index -> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: same as above with |
||
IndexMetadata.Data indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName()); | ||
log.info("Index " + indexMetadata.getName() + " has " + indexMetadata.getNumberOfShards() + " shards"); | ||
IntStream.range(0, indexMetadata.getNumberOfShards()).forEach(shardId -> { | ||
log.info("Creating Documents Work Item for index: " + indexMetadata.getName() + ", shard: " + shardId); | ||
try { | ||
workCoordinator.createUnassignedWorkItem(IndexAndShard.formatAsWorkItemString(indexMetadata.getName(), shardId)); | ||
} catch (IOException e) { | ||
throw Lombok.sneakyThrow(e); | ||
} | ||
}); | ||
}) | ||
.count(); // Force the stream to execute | ||
|
||
log.info("Finished setting up the Documents Work Items."); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make a backlog item for this to support regex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, done: https://opensearch.atlassian.net/browse/MIGRATIONS-1804