-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Updated RFS to filter out system indices by default #763
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
package com.rfs.common; | ||
|
||
import java.util.List; | ||
import java.util.function.BiConsumer; | ||
import java.util.function.Predicate; | ||
|
||
public class FilterScheme { | ||
|
||
public static Predicate<SnapshotRepo.Index> filterIndicesByAllowList(List<String> indexAllowlist, BiConsumer<String, Boolean> indexNameAcceptanceObserver) { | ||
return index -> { | ||
boolean accepted; | ||
if (indexAllowlist.isEmpty()) { | ||
accepted = !index.getName().startsWith("."); | ||
} else { | ||
accepted = indexAllowlist.contains(index.getName()); | ||
} | ||
|
||
indexNameAcceptanceObserver.accept(index.getName(), accepted); | ||
|
||
return accepted; | ||
}; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,13 @@ | ||
package com.rfs.worker; | ||
|
||
import java.util.Optional; | ||
import java.util.List; | ||
import java.util.function.BiConsumer; | ||
|
||
import com.fasterxml.jackson.databind.node.ObjectNode; | ||
import com.rfs.common.SnapshotRepo; | ||
import lombok.AllArgsConstructor; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
import com.rfs.common.FilterScheme; | ||
import com.rfs.common.IndexMetadata; | ||
import com.rfs.transformers.Transformer; | ||
import com.rfs.version_os_2_11.IndexCreator_OS_2_11; | ||
|
@@ -19,18 +20,28 @@ | |
private final IndexMetadata.Factory metadataFactory; | ||
private final IndexCreator_OS_2_11 indexCreator; | ||
private final Transformer transformer; | ||
private final List<String> indexAllowlist; | ||
|
||
public void migrateIndices() { | ||
SnapshotRepo.Provider repoDataProvider = metadataFactory.getRepoDataProvider(); | ||
// TODO - parallelize this, maybe ~400-1K requests per thread and do it asynchronously | ||
for (SnapshotRepo.Index index : repoDataProvider.getIndicesInSnapshot(snapshotName)) { | ||
var indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName()); | ||
var root = indexMetadata.toObjectNode(); | ||
var transformedRoot = transformer.transformIndexMetadata(root); | ||
var resultOp = indexCreator.create(transformedRoot, index.getName(), indexMetadata.getId()); | ||
resultOp.ifPresentOrElse(value -> log.info("Index " + index.getName() + " created successfully"), | ||
() -> log.info("Index " + index.getName() + " already existed; no work required") | ||
); | ||
} | ||
|
||
BiConsumer<String, Boolean> logger = (indexName, accepted) -> { | ||
if (!accepted) { | ||
log.info("Index " + indexName + " rejected by allowlist"); | ||
} | ||
}; | ||
repoDataProvider.getIndicesInSnapshot(snapshotName).stream() | ||
.filter(FilterScheme.filterIndicesByAllowList(indexAllowlist, logger)) | ||
.peek(index -> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: we should be able to do a |
||
var indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName()); | ||
var root = indexMetadata.toObjectNode(); | ||
var transformedRoot = transformer.transformIndexMetadata(root); | ||
var resultOp = indexCreator.create(transformedRoot, index.getName(), indexMetadata.getId()); | ||
resultOp.ifPresentOrElse(value -> log.info("Index " + index.getName() + " created successfully"), | ||
() -> log.info("Index " + index.getName() + " already existed; no work required") | ||
); | ||
}) | ||
.count(); // Force the stream to execute | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. foreach() will force the stream to run. I think that this will generate a linting error as it is since the output of count() is discarded. |
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make a backlog item for this to support regex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, done: https://opensearch.atlassian.net/browse/MIGRATIONS-1804