Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshot status #757

Merged
merged 8 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ public static class Args {
@Parameter(names = {"--source-insecure"},
description = "Allow untrusted SSL certificates for source")
public boolean sourceInsecure = false;

@Parameter(names = {"--no-wait"}, description = "Optional. If provided, the snapshot runner will not wait for completion")
public boolean noWait = false;

@Parameter(names = {"--max-snapshot-rate-mb-per-node"},
required = false,
description = "The maximum snapshot rate in megabytes per second per node")
public Integer maxSnapshotRateMBPerNode;
}

@Getter
Expand Down Expand Up @@ -87,16 +95,22 @@ public static void main(String[] args) throws Exception {
log.info("Running CreateSnapshot with {}", String.join(" ", args));
run(c -> ((arguments.fileSystemRepoPath != null)
? new FileSystemSnapshotCreator(arguments.snapshotName, c, arguments.fileSystemRepoPath)
: new S3SnapshotCreator(arguments.snapshotName, c, arguments.s3RepoUri, arguments.s3Region)),
new OpenSearchClient(arguments.sourceHost, arguments.sourceUser, arguments.sourcePass, arguments.sourceInsecure)
: new S3SnapshotCreator(arguments.snapshotName, c, arguments.s3RepoUri, arguments.s3Region, arguments.maxSnapshotRateMBPerNode)),
new OpenSearchClient(arguments.sourceHost, arguments.sourceUser, arguments.sourcePass, arguments.sourceInsecure),
arguments.noWait
);
}

public static void run(Function<OpenSearchClient, SnapshotCreator> snapshotCreatorFactory,
OpenSearchClient openSearchClient)
throws Exception {
OpenSearchClient openSearchClient, boolean noWait) throws Exception {
TryHandlePhaseFailure.executeWithTryCatch(() -> {
SnapshotRunner.runAndWaitForCompletion(snapshotCreatorFactory.apply(openSearchClient));
if (noWait) {
SnapshotRunner.run(snapshotCreatorFactory.apply(openSearchClient));
} else {
SnapshotRunner.runAndWaitForCompletion(snapshotCreatorFactory.apply(openSearchClient));
}
});
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ public static void main(String[] args) throws Exception {
luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);

run(LuceneDocumentsReader::new, reindexer, workCoordinator, processManager, indexMetadataFactory,
arguments.snapshotName, arguments.indexAllowlist, shardMetadataFactory, unpackerFactory, arguments.maxShardSizeBytes);
arguments.snapshotName, arguments.indexAllowlist, shardMetadataFactory, unpackerFactory,
arguments.maxShardSizeBytes);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public void test(String sourceImageName, String targetImageName, int numWorkers)
final List<String> INDEX_ALLOWLIST = List.of();
CreateSnapshot.run(
c -> new FileSystemSnapshotCreator(SNAPSHOT_NAME, c, ElasticsearchContainer.CLUSTER_SNAPSHOT_DIR),
new OpenSearchClient(esSourceContainer.getUrl(), null));
new OpenSearchClient(esSourceContainer.getUrl(), null),
false);
var tempDir = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_snapshot");
try {
esSourceContainer.copySnapshotData(tempDir.toString());
Expand Down
13 changes: 6 additions & 7 deletions RFS/src/main/java/com/rfs/ReindexFromSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public static void main(String[] args) throws InterruptedException {

Logging.setLevel(logLevel);

ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass);
ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass);
ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass);

// Sanity checks
Expand All @@ -142,7 +142,7 @@ public static void main(String[] args) throws InterruptedException {
* 1. A local snapshot directory
* 2. A source host we'll take the snapshot from
* 3. An S3 URI of an existing snapshot in S3
*
*
* If you provide the source host, you still need to provide the S3 details or the snapshotLocalRepoDirPath to write the snapshot to.
*/
if (snapshotDirPath != null && (sourceHost != null || s3RepoUri != null)) {
Expand Down Expand Up @@ -178,7 +178,7 @@ public static void main(String[] args) throws InterruptedException {
if (sourceHost != null) {
// ==========================================================================================================
// Create the snapshot if necessary
// ==========================================================================================================
// ==========================================================================================================
logger.info("==================================================================");
logger.info("Attempting to create the snapshot...");
OpenSearchClient sourceClient = new OpenSearchClient(sourceConnection);
Expand Down Expand Up @@ -346,9 +346,8 @@ public static void main(String[] args) throws InterruptedException {
}

// Unpack the shard
try (SnapshotShardUnpacker unpacker = unpackerFactory.create(shardMetadata)) {
unpacker.unpack();
}
SnapshotShardUnpacker unpacker = unpackerFactory.create(shardMetadata);
unpacker.unpack();
}
}

Expand Down Expand Up @@ -383,7 +382,7 @@ public static void main(String[] args) throws InterruptedException {
reindexer.refreshAllDocuments(targetConnection);
logger.info("Refresh complete");
}

} catch (Exception e) {
e.printStackTrace();
}
Expand Down
4 changes: 4 additions & 0 deletions RFS/src/main/java/com/rfs/common/ClusterVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public static ClusterVersion fromInt(int versionId) {
return ES_6_8;
} else if (versionString.startsWith("710")){
return ES_7_10;
}
// temp bypass for 717 to 710
else if (versionString.startsWith("717")){
return ES_7_10;
} else {
throw new IllegalArgumentException("Invalid version: " + versionId);
}
Expand Down
2 changes: 1 addition & 1 deletion RFS/src/main/java/com/rfs/common/DocumentReindexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class DocumentReindexer {
private static final int MAX_BATCH_SIZE = 1000; // Arbitrarily chosen
protected final OpenSearchClient client;

public Mono<Void> reindex(String indexName, Flux<Document> documentStream) {
public Mono<Void> reindex(String indexName, Flux<Document> documentStream) {

return documentStream
.map(this::convertDocumentToBulkSection) // Convert each Document to part of a bulk operation
Expand Down
5 changes: 5 additions & 0 deletions RFS/src/main/java/com/rfs/common/OpenSearchClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
Expand All @@ -24,6 +25,10 @@ public class OpenSearchClient {
private static final Logger logger = LogManager.getLogger(OpenSearchClient.class);
private static final ObjectMapper objectMapper = new ObjectMapper();

static {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

public final ConnectionDetails connectionDetails;
private final RestClient client;

Expand Down
13 changes: 9 additions & 4 deletions RFS/src/main/java/com/rfs/common/S3SnapshotCreator.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@
public class S3SnapshotCreator extends SnapshotCreator {
private static final ObjectMapper mapper = new ObjectMapper();

private final OpenSearchClient client;
private final String snapshotName;
private final String s3Uri;
private final String s3Region;
private final Integer maxSnapshotRateMBPerNode;

public S3SnapshotCreator(String snapshotName, OpenSearchClient client, String s3Uri, String s3Region) {
this(snapshotName, client, s3Uri, s3Region, null);
}

public S3SnapshotCreator(String snapshotName, OpenSearchClient client, String s3Uri, String s3Region, Integer maxSnapshotRateMBPerNode) {
super(snapshotName, client);
this.snapshotName = snapshotName;
this.client = client;
this.s3Uri = s3Uri;
this.s3Region = s3Region;
this.maxSnapshotRateMBPerNode = maxSnapshotRateMBPerNode;
}

@Override
Expand All @@ -26,6 +28,9 @@ public ObjectNode getRequestBodyForRegisterRepo() {
settings.put("bucket", getBucketName());
settings.put("region", s3Region);
settings.put("base_path", getBasePath());
if (maxSnapshotRateMBPerNode != null) {
settings.put("max_snapshot_bytes_per_sec", maxSnapshotRateMBPerNode + "mb");
}

ObjectNode body = mapper.createObjectNode();
body.put("type", "s3");
Expand Down
4 changes: 4 additions & 0 deletions RFS/src/main/java/com/rfs/common/SnapshotCreator.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.rfs.common;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand All @@ -11,6 +12,9 @@
public abstract class SnapshotCreator {
private static final Logger logger = LogManager.getLogger(SnapshotCreator.class);
private static final ObjectMapper mapper = new ObjectMapper();
static {
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

private final OpenSearchClient client;
private final String snapshotName;
Expand Down
36 changes: 2 additions & 34 deletions RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.rfs.common;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand All @@ -18,7 +16,7 @@
import org.apache.lucene.util.BytesRef;

@RequiredArgsConstructor
public class SnapshotShardUnpacker implements AutoCloseable {
public class SnapshotShardUnpacker {
private static final Logger logger = LogManager.getLogger(SnapshotShardUnpacker.class);
private final SourceRepoAccessor repoAccessor;
private final Path luceneFilesBasePath;
Expand Down Expand Up @@ -72,40 +70,10 @@ public Path unpack() {
}
}

@Override
public void close() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we care about these things not getting cleaned up?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we want to recycle tasks between shards with ECS. When this was in there, we were deleting the shard date before we reindexed it

try {
Path luceneIndexDir = Paths.get(luceneFilesBasePath + "/" + shardMetadata.getIndexName() + "/" + shardMetadata.getShardId());
if (Files.exists(luceneIndexDir)) {
deleteRecursively(luceneIndexDir);
}

} catch (Exception e) {
throw new CouldNotCleanUpShard("Could not clean up shard: Index " + shardMetadata.getIndexId() + ", Shard " + shardMetadata.getShardId(), e);
}
}

protected void deleteRecursively(Path path) throws IOException {
if (Files.isDirectory(path)) {
try (DirectoryStream<Path> entries = Files.newDirectoryStream(path)) {
for (Path entry : entries) {
deleteRecursively(entry);
}
}
}
Files.delete(path);
}

public static class CouldNotCleanUpShard extends RfsException {
public CouldNotCleanUpShard(String message, Exception e) {
super(message, e);
}
}

public static class CouldNotUnpackShard extends RfsException {
public CouldNotUnpackShard(String message, Exception e) {
super(message, e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -19,6 +20,10 @@
public class ShardMetadataData_ES_7_10 implements ShardMetadata.Data {
private static final ObjectMapper objectMapper = new ObjectMapper();

static {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

private String snapshotName;
private String indexName;
private String indexId;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.rfs.version_es_7_10;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
Expand All @@ -18,6 +19,7 @@ public class ShardMetadataFactory_ES_7_10 implements ShardMetadata.Factory {
@Override
public ShardMetadata.Data fromJsonNode(JsonNode root, String indexId, String indexName, int shardId) {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
SimpleModule module = new SimpleModule();
module.addDeserializer(ShardMetadataData_ES_7_10.FileInfoRaw.class, new ShardMetadataData_ES_7_10.FileInfoRawDeserializer());
objectMapper.registerModule(module);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.rfs.version_es_7_10;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand All @@ -16,6 +17,7 @@ public class SnapshotMetadataFactory_ES_7_10 implements com.rfs.common.SnapshotM
@Override
public SnapshotMetadata.Data fromJsonNode(JsonNode root) throws Exception {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
ObjectNode objectNodeRoot = (ObjectNode) root;
SnapshotMetadataData_ES_7_10 snapshotMetadata = mapper.treeToValue(objectNodeRoot.get("snapshot"), SnapshotMetadataData_ES_7_10.class);
return snapshotMetadata;
Expand Down
23 changes: 11 additions & 12 deletions RFS/src/main/java/com/rfs/worker/DocumentsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,17 @@ private void doDocumentsMigration(IndexAndShard indexAndShard) {
log.info("Migrating docs for " + indexAndShard);
ShardMetadata.Data shardMetadata = shardMetadataFactory.apply(indexAndShard.indexName, indexAndShard.shard);

try (var unpacker = unpackerFactory.create(shardMetadata)) {
var reader = readerFactory.apply(unpacker.unpack());
Flux<Document> documents = reader.readDocuments();
var unpacker = unpackerFactory.create(shardMetadata);
var reader = readerFactory.apply(unpacker.unpack());
Flux<Document> documents = reader.readDocuments();

reindexer.reindex(shardMetadata.getIndexName(), documents)
.doOnError(error -> log.error("Error during reindexing: " + error))
.doOnSuccess(done -> log.atInfo()
.setMessage(()->"Reindexing completed for Index " + shardMetadata.getIndexName() +
", Shard " + shardMetadata.getShardId()).log())
// Wait for the reindexing to complete before proceeding
.block();
log.info("Docs migrated");
}
reindexer.reindex(shardMetadata.getIndexName(), documents)
.doOnError(error -> log.error("Error during reindexing: " + error))
.doOnSuccess(done -> log.atInfo()
.setMessage(()->"Reindexing completed for Index " + shardMetadata.getIndexName() +
", Shard " + shardMetadata.getShardId()).log())
// Wait for the reindexing to complete before proceeding
.block();
log.info("Docs migrated");
}
}
15 changes: 10 additions & 5 deletions RFS/src/main/java/com/rfs/worker/SnapshotRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@ protected static void waitForSnapshotToFinish(SnapshotCreator snapshotCreator) t

public static void runAndWaitForCompletion(SnapshotCreator snapshotCreator) {
try {
log.info("Attempting to initiate the snapshot...");
snapshotCreator.registerRepo();
snapshotCreator.createSnapshot();

log.info("Snapshot in progress...");
run(snapshotCreator);
waitForSnapshotToFinish(snapshotCreator);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Interrupted while waiting for Snapshot to complete", e);
throw new SnapshotCreator.SnapshotCreationFailed(snapshotCreator.getSnapshotName());
}
}

public static void run(SnapshotCreator snapshotCreator) {
log.info("Attempting to initiate the snapshot...");
snapshotCreator.registerRepo();
snapshotCreator.createSnapshot();
log.info("Snapshot in progress...");
}


}
2 changes: 1 addition & 1 deletion RFS/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %msg%n"/>
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{UTC} %p %c{1.} [%t] %m%n"/>
</Console>
</Appenders>
<Loggers>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser()
parser.add_argument("--endpoint", help="Cluster endpoint e.g. http://test.elb.us-west-2.amazonaws.com:9200.")
parser.add_argument("--username", help="Cluster username.", default='admin')
parser.add_argument("--password", help="Cluster password.", default='admin')
parser.add_argument("--no-auth", action='store_true', help="Flag to provide no auth in requests.")
parser.add_argument("--username", help="Cluster username.")
parser.add_argument("--password", help="Cluster password.")
parser.add_argument("--no-clear-output", action='store_true',
help="Flag to not clear the output before each run. " +
"Helpful for piping to a file or other utility.")
Expand Down Expand Up @@ -111,7 +110,7 @@ def main():
args = parse_args()

url_base = args.endpoint or os.environ.get('SOURCE_DOMAIN_ENDPOINT', 'https://capture-proxy:9200')
auth = None if args.no_auth else (args.username, args.password)
auth = (args.username, args.password) if args.username and args.password else None

session = requests.Session()
keep_alive_headers = {'Connection': 'keep-alive'}
Expand Down
Loading
Loading