Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshot status #757

Merged
merged 8 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ public static class Args {
@Parameter(names = {"--source-insecure"},
description = "Allow untrusted SSL certificates for source")
public boolean sourceInsecure = false;

@Parameter(names = {"--no-wait"}, description = "Optional. If provided, the snapshot runner will not wait for completion")
public boolean noWait = false;

@Parameter(names = {"--max-snapshot-rate-mb-per-node"},
required = false,
description = "The maximum snapshot rate in megabytes per second per node")
public Integer maxSnapshotRateMBPerNode;
}

@Getter
Expand Down Expand Up @@ -87,16 +95,22 @@ public static void main(String[] args) throws Exception {
log.info("Running CreateSnapshot with {}", String.join(" ", args));
run(c -> ((arguments.fileSystemRepoPath != null)
? new FileSystemSnapshotCreator(arguments.snapshotName, c, arguments.fileSystemRepoPath)
: new S3SnapshotCreator(arguments.snapshotName, c, arguments.s3RepoUri, arguments.s3Region)),
new OpenSearchClient(arguments.sourceHost, arguments.sourceUser, arguments.sourcePass, arguments.sourceInsecure)
: new S3SnapshotCreator(arguments.snapshotName, c, arguments.s3RepoUri, arguments.s3Region, arguments.maxSnapshotRateMBPerNode)),
new OpenSearchClient(arguments.sourceHost, arguments.sourceUser, arguments.sourcePass, arguments.sourceInsecure),
arguments.noWait
);
}

public static void run(Function<OpenSearchClient, SnapshotCreator> snapshotCreatorFactory,
OpenSearchClient openSearchClient)
throws Exception {
OpenSearchClient openSearchClient, boolean noWait) throws Exception {
TryHandlePhaseFailure.executeWithTryCatch(() -> {
SnapshotRunner.runAndWaitForCompletion(snapshotCreatorFactory.apply(openSearchClient));
if (noWait) {
SnapshotRunner.run(snapshotCreatorFactory.apply(openSearchClient));
} else {
SnapshotRunner.runAndWaitForCompletion(snapshotCreatorFactory.apply(openSearchClient));
}
});
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ public static void main(String[] args) throws Exception {
luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);

run(LuceneDocumentsReader::new, reindexer, workCoordinator, processManager, indexMetadataFactory,
arguments.snapshotName, arguments.indexAllowlist, shardMetadataFactory, unpackerFactory, arguments.maxShardSizeBytes);
arguments.snapshotName, arguments.indexAllowlist, shardMetadataFactory, unpackerFactory,
arguments.maxShardSizeBytes);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public void test(String sourceImageName, String targetImageName, int numWorkers)
final List<String> INDEX_ALLOWLIST = List.of();
CreateSnapshot.run(
c -> new FileSystemSnapshotCreator(SNAPSHOT_NAME, c, ElasticsearchContainer.CLUSTER_SNAPSHOT_DIR),
new OpenSearchClient(esSourceContainer.getUrl(), null));
new OpenSearchClient(esSourceContainer.getUrl(), null),
false);
var tempDir = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_snapshot");
try {
esSourceContainer.copySnapshotData(tempDir.toString());
Expand Down
13 changes: 6 additions & 7 deletions RFS/src/main/java/com/rfs/ReindexFromSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@

Logging.setLevel(logLevel);

ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass);
ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass);

Check warning on line 128 in RFS/src/main/java/com/rfs/ReindexFromSnapshot.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/ReindexFromSnapshot.java#L128

Added line #L128 was not covered by tests
ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass);

// Sanity checks
Expand All @@ -142,7 +142,7 @@
* 1. A local snapshot directory
* 2. A source host we'll take the snapshot from
* 3. An S3 URI of an existing snapshot in S3
*
*
* If you provide the source host, you still need to provide the S3 details or the snapshotLocalRepoDirPath to write the snapshot to.
*/
if (snapshotDirPath != null && (sourceHost != null || s3RepoUri != null)) {
Expand Down Expand Up @@ -178,7 +178,7 @@
if (sourceHost != null) {
// ==========================================================================================================
// Create the snapshot if necessary
// ==========================================================================================================
// ==========================================================================================================
logger.info("==================================================================");
logger.info("Attempting to create the snapshot...");
OpenSearchClient sourceClient = new OpenSearchClient(sourceConnection);
Expand Down Expand Up @@ -346,9 +346,8 @@
}

// Unpack the shard
try (SnapshotShardUnpacker unpacker = unpackerFactory.create(shardMetadata)) {
unpacker.unpack();
}
SnapshotShardUnpacker unpacker = unpackerFactory.create(shardMetadata);
unpacker.unpack();

Check warning on line 350 in RFS/src/main/java/com/rfs/ReindexFromSnapshot.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/ReindexFromSnapshot.java#L349-L350

Added lines #L349 - L350 were not covered by tests
}
}

Expand Down Expand Up @@ -383,7 +382,7 @@
reindexer.refreshAllDocuments(targetConnection);
logger.info("Refresh complete");
}

} catch (Exception e) {
e.printStackTrace();
}
Expand Down
4 changes: 4 additions & 0 deletions RFS/src/main/java/com/rfs/common/ClusterVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
return ES_6_8;
} else if (versionString.startsWith("710")){
return ES_7_10;
}
// temp bypass for 717 to 710
else if (versionString.startsWith("717")){
return ES_7_10;

Check warning on line 37 in RFS/src/main/java/com/rfs/common/ClusterVersion.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/ClusterVersion.java#L37

Added line #L37 was not covered by tests
} else {
throw new IllegalArgumentException("Invalid version: " + versionId);
}
Expand Down
2 changes: 1 addition & 1 deletion RFS/src/main/java/com/rfs/common/DocumentReindexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class DocumentReindexer {
private static final int MAX_BATCH_SIZE = 1000; // Arbitrarily chosen
protected final OpenSearchClient client;

public Mono<Void> reindex(String indexName, Flux<Document> documentStream) {
public Mono<Void> reindex(String indexName, Flux<Document> documentStream) {

return documentStream
.map(this::convertDocumentToBulkSection) // Convert each Document to part of a bulk operation
Expand Down
5 changes: 5 additions & 0 deletions RFS/src/main/java/com/rfs/common/OpenSearchClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
Expand All @@ -24,6 +25,10 @@ public class OpenSearchClient {
private static final Logger logger = LogManager.getLogger(OpenSearchClient.class);
private static final ObjectMapper objectMapper = new ObjectMapper();

static {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

public final ConnectionDetails connectionDetails;
private final RestClient client;

Expand Down
13 changes: 9 additions & 4 deletions RFS/src/main/java/com/rfs/common/S3SnapshotCreator.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@
public class S3SnapshotCreator extends SnapshotCreator {
private static final ObjectMapper mapper = new ObjectMapper();

private final OpenSearchClient client;
private final String snapshotName;
private final String s3Uri;
private final String s3Region;
private final Integer maxSnapshotRateMBPerNode;

public S3SnapshotCreator(String snapshotName, OpenSearchClient client, String s3Uri, String s3Region) {
this(snapshotName, client, s3Uri, s3Region, null);
}

Check warning on line 15 in RFS/src/main/java/com/rfs/common/S3SnapshotCreator.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/S3SnapshotCreator.java#L14-L15

Added lines #L14 - L15 were not covered by tests

public S3SnapshotCreator(String snapshotName, OpenSearchClient client, String s3Uri, String s3Region, Integer maxSnapshotRateMBPerNode) {
super(snapshotName, client);
this.snapshotName = snapshotName;
this.client = client;
this.s3Uri = s3Uri;
this.s3Region = s3Region;
this.maxSnapshotRateMBPerNode = maxSnapshotRateMBPerNode;

Check warning on line 21 in RFS/src/main/java/com/rfs/common/S3SnapshotCreator.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/S3SnapshotCreator.java#L21

Added line #L21 was not covered by tests
}

@Override
Expand All @@ -26,6 +28,9 @@
settings.put("bucket", getBucketName());
settings.put("region", s3Region);
settings.put("base_path", getBasePath());
if (maxSnapshotRateMBPerNode != null) {
settings.put("max_snapshot_bytes_per_sec", maxSnapshotRateMBPerNode + "mb");

Check warning on line 32 in RFS/src/main/java/com/rfs/common/S3SnapshotCreator.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/S3SnapshotCreator.java#L32

Added line #L32 was not covered by tests
}

ObjectNode body = mapper.createObjectNode();
body.put("type", "s3");
Expand Down
4 changes: 4 additions & 0 deletions RFS/src/main/java/com/rfs/common/SnapshotCreator.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.rfs.common;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand All @@ -11,6 +12,9 @@
public abstract class SnapshotCreator {
private static final Logger logger = LogManager.getLogger(SnapshotCreator.class);
private static final ObjectMapper mapper = new ObjectMapper();
static {
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

Check warning on line 17 in RFS/src/main/java/com/rfs/common/SnapshotCreator.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/SnapshotCreator.java#L16-L17

Added lines #L16 - L17 were not covered by tests

private final OpenSearchClient client;
private final String snapshotName;
Expand Down
36 changes: 2 additions & 34 deletions RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.rfs.common;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand All @@ -18,7 +16,7 @@
import org.apache.lucene.util.BytesRef;

@RequiredArgsConstructor
public class SnapshotShardUnpacker implements AutoCloseable {
public class SnapshotShardUnpacker {
private static final Logger logger = LogManager.getLogger(SnapshotShardUnpacker.class);
private final SourceRepoAccessor repoAccessor;
private final Path luceneFilesBasePath;
Expand Down Expand Up @@ -72,40 +70,10 @@ public Path unpack() {
}
}

@Override
public void close() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we care about these things not getting cleaned up?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we want to recycle tasks between shards with ECS. When this was in there, we were deleting the shard date before we reindexed it

try {
Path luceneIndexDir = Paths.get(luceneFilesBasePath + "/" + shardMetadata.getIndexName() + "/" + shardMetadata.getShardId());
if (Files.exists(luceneIndexDir)) {
deleteRecursively(luceneIndexDir);
}

} catch (Exception e) {
throw new CouldNotCleanUpShard("Could not clean up shard: Index " + shardMetadata.getIndexId() + ", Shard " + shardMetadata.getShardId(), e);
}
}

protected void deleteRecursively(Path path) throws IOException {
if (Files.isDirectory(path)) {
try (DirectoryStream<Path> entries = Files.newDirectoryStream(path)) {
for (Path entry : entries) {
deleteRecursively(entry);
}
}
}
Files.delete(path);
}

public static class CouldNotCleanUpShard extends RfsException {
public CouldNotCleanUpShard(String message, Exception e) {
super(message, e);
}
}

public static class CouldNotUnpackShard extends RfsException {
public CouldNotUnpackShard(String message, Exception e) {
super(message, e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -19,6 +20,10 @@
public class ShardMetadataData_ES_7_10 implements ShardMetadata.Data {
private static final ObjectMapper objectMapper = new ObjectMapper();

static {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

private String snapshotName;
private String indexName;
private String indexId;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.rfs.version_es_7_10;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
Expand All @@ -18,6 +19,7 @@ public class ShardMetadataFactory_ES_7_10 implements ShardMetadata.Factory {
@Override
public ShardMetadata.Data fromJsonNode(JsonNode root, String indexId, String indexName, int shardId) {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
SimpleModule module = new SimpleModule();
module.addDeserializer(ShardMetadataData_ES_7_10.FileInfoRaw.class, new ShardMetadataData_ES_7_10.FileInfoRawDeserializer());
objectMapper.registerModule(module);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.rfs.version_es_7_10;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand All @@ -16,6 +17,7 @@
@Override
public SnapshotMetadata.Data fromJsonNode(JsonNode root) throws Exception {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

Check warning on line 20 in RFS/src/main/java/com/rfs/version_es_7_10/SnapshotMetadataFactory_ES_7_10.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/version_es_7_10/SnapshotMetadataFactory_ES_7_10.java#L20

Added line #L20 was not covered by tests
ObjectNode objectNodeRoot = (ObjectNode) root;
SnapshotMetadataData_ES_7_10 snapshotMetadata = mapper.treeToValue(objectNodeRoot.get("snapshot"), SnapshotMetadataData_ES_7_10.class);
return snapshotMetadata;
Expand Down
23 changes: 11 additions & 12 deletions RFS/src/main/java/com/rfs/worker/DocumentsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,17 @@
log.info("Migrating docs for " + indexAndShard);
ShardMetadata.Data shardMetadata = shardMetadataFactory.apply(indexAndShard.indexName, indexAndShard.shard);

try (var unpacker = unpackerFactory.create(shardMetadata)) {
var reader = readerFactory.apply(unpacker.unpack());
Flux<Document> documents = reader.readDocuments();
var unpacker = unpackerFactory.create(shardMetadata);
var reader = readerFactory.apply(unpacker.unpack());
Flux<Document> documents = reader.readDocuments();

Check warning on line 82 in RFS/src/main/java/com/rfs/worker/DocumentsRunner.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/worker/DocumentsRunner.java#L80-L82

Added lines #L80 - L82 were not covered by tests

reindexer.reindex(shardMetadata.getIndexName(), documents)
.doOnError(error -> log.error("Error during reindexing: " + error))
.doOnSuccess(done -> log.atInfo()
.setMessage(()->"Reindexing completed for Index " + shardMetadata.getIndexName() +
", Shard " + shardMetadata.getShardId()).log())
// Wait for the reindexing to complete before proceeding
.block();
log.info("Docs migrated");
}
reindexer.reindex(shardMetadata.getIndexName(), documents)
.doOnError(error -> log.error("Error during reindexing: " + error))
.doOnSuccess(done -> log.atInfo()
.setMessage(()->"Reindexing completed for Index " + shardMetadata.getIndexName() +
", Shard " + shardMetadata.getShardId()).log())

Check warning on line 88 in RFS/src/main/java/com/rfs/worker/DocumentsRunner.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/worker/DocumentsRunner.java#L84-L88

Added lines #L84 - L88 were not covered by tests
// Wait for the reindexing to complete before proceeding
.block();
log.info("Docs migrated");

Check warning on line 91 in RFS/src/main/java/com/rfs/worker/DocumentsRunner.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/worker/DocumentsRunner.java#L90-L91

Added lines #L90 - L91 were not covered by tests
}
}
15 changes: 10 additions & 5 deletions RFS/src/main/java/com/rfs/worker/SnapshotRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@

public static void runAndWaitForCompletion(SnapshotCreator snapshotCreator) {
try {
log.info("Attempting to initiate the snapshot...");
snapshotCreator.registerRepo();
snapshotCreator.createSnapshot();

log.info("Snapshot in progress...");
run(snapshotCreator);

Check warning on line 21 in RFS/src/main/java/com/rfs/worker/SnapshotRunner.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/worker/SnapshotRunner.java#L21

Added line #L21 was not covered by tests
waitForSnapshotToFinish(snapshotCreator);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Interrupted while waiting for Snapshot to complete", e);
throw new SnapshotCreator.SnapshotCreationFailed(snapshotCreator.getSnapshotName());
}
}

public static void run(SnapshotCreator snapshotCreator) {
log.info("Attempting to initiate the snapshot...");
snapshotCreator.registerRepo();
snapshotCreator.createSnapshot();
log.info("Snapshot in progress...");
}

Check warning on line 35 in RFS/src/main/java/com/rfs/worker/SnapshotRunner.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/worker/SnapshotRunner.java#L31-L35

Added lines #L31 - L35 were not covered by tests


}
2 changes: 1 addition & 1 deletion RFS/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %msg%n"/>
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}{UTC} %p %c{1.} [%t] %m%n"/>
</Console>
</Appenders>
<Loggers>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser()
parser.add_argument("--endpoint", help="Cluster endpoint e.g. http://test.elb.us-west-2.amazonaws.com:9200.")
parser.add_argument("--username", help="Cluster username.", default='admin')
parser.add_argument("--password", help="Cluster password.", default='admin')
parser.add_argument("--no-auth", action='store_true', help="Flag to provide no auth in requests.")
parser.add_argument("--username", help="Cluster username.")
parser.add_argument("--password", help="Cluster password.")
parser.add_argument("--no-clear-output", action='store_true',
help="Flag to not clear the output before each run. " +
"Helpful for piping to a file or other utility.")
Expand Down Expand Up @@ -111,7 +110,7 @@ def main():
args = parse_args()

url_base = args.endpoint or os.environ.get('SOURCE_DOMAIN_ENDPOINT', 'https://capture-proxy:9200')
auth = None if args.no_auth else (args.username, args.password)
auth = (args.username, args.password) if args.username and args.password else None

session = requests.Session()
keep_alive_headers = {'Connection': 'keep-alive'}
Expand Down
Loading
Loading