Skip to content

Commit

Permalink
Created an OpenSearchClient layer above the RestClient
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Helma <chelma+github@amazon.com>
  • Loading branch information
chelma committed Apr 30, 2024
1 parent 4552e66 commit c3e13a1
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 130 deletions.
13 changes: 8 additions & 5 deletions RFS/src/main/java/com/rfs/ReindexFromSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,10 @@ public static void main(String[] args) throws InterruptedException {
// ==========================================================================================================
logger.info("==================================================================");
logger.info("Attempting to create the snapshot...");
OpenSearchClient sourceClient = new OpenSearchClient(sourceConnection);
SnapshotCreator snapshotCreator = repo instanceof S3Repo
? new S3SnapshotCreator(snapshotName, sourceConnection, s3RepoUri, s3Region)
: new FileSystemSnapshotCreator(snapshotName, sourceConnection, snapshotLocalRepoDirPath.toString());
? new S3SnapshotCreator(snapshotName, sourceClient, s3RepoUri, s3Region)
: new FileSystemSnapshotCreator(snapshotName, sourceClient, snapshotLocalRepoDirPath.toString());
snapshotCreator.registerRepo();
snapshotCreator.createSnapshot();
while (!snapshotCreator.isSnapshotFinished()) {
Expand Down Expand Up @@ -266,14 +267,15 @@ public static void main(String[] args) throws InterruptedException {
logger.info("==================================================================");
logger.info("Attempting to recreate the Global Metadata...");

OpenSearchClient targetClient = new OpenSearchClient(targetConnection);
if (sourceVersion == ClusterVersion.ES_6_8) {
ObjectNode root = globalMetadata.toObjectNode();
ObjectNode transformedRoot = transformer.transformGlobalMetadata(root);
GlobalMetadataCreator_OS_2_11.create(transformedRoot, targetConnection, Collections.emptyList(), templateWhitelist);
GlobalMetadataCreator_OS_2_11.create(transformedRoot, targetClient, Collections.emptyList(), templateWhitelist);
} else if (sourceVersion == ClusterVersion.ES_7_10) {
ObjectNode root = globalMetadata.toObjectNode();
ObjectNode transformedRoot = transformer.transformGlobalMetadata(root);
GlobalMetadataCreator_OS_2_11.create(transformedRoot, targetConnection, componentTemplateWhitelist, templateWhitelist);
GlobalMetadataCreator_OS_2_11.create(transformedRoot, targetClient, componentTemplateWhitelist, templateWhitelist);
}
}

Expand Down Expand Up @@ -301,14 +303,15 @@ public static void main(String[] args) throws InterruptedException {
// ==========================================================================================================
logger.info("==================================================================");
logger.info("Attempting to recreate the indices...");
OpenSearchClient targetClient = new OpenSearchClient(targetConnection);
for (IndexMetadata.Data indexMetadata : indexMetadatas) {
String reindexName = indexMetadata.getName() + indexSuffix;
logger.info("Recreating index " + indexMetadata.getName() + " as " + reindexName + " on target...");

ObjectNode root = indexMetadata.toObjectNode();
ObjectNode transformedRoot = transformer.transformIndexMetadata(root);
IndexMetadataData_OS_2_11 indexMetadataOS211 = new IndexMetadataData_OS_2_11(transformedRoot, indexMetadata.getId(), reindexName);
IndexCreator_OS_2_11.create(reindexName, indexMetadataOS211, targetConnection);
IndexCreator_OS_2_11.create(reindexName, indexMetadataOS211, targetClient);
}
}

Expand Down
10 changes: 4 additions & 6 deletions RFS/src/main/java/com/rfs/common/DocumentReindexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@ public class DocumentReindexer {
private static final int MAX_BATCH_SIZE = 1000; // Arbitrarily chosen

public static Mono<Void> reindex(String indexName, Flux<Document> documentStream, ConnectionDetails targetConnection) throws Exception {
RestClient client = new RestClient(targetConnection);

String targetPath = indexName + "/_bulk";
OpenSearchClient client = new OpenSearchClient(targetConnection);

return documentStream
.map(DocumentReindexer::convertDocumentToBulkSection) // Convert each Document to part of a bulk operation
.buffer(MAX_BATCH_SIZE) // Collect until you hit the batch size
.doOnNext(bulk -> logger.info(bulk.size() + " documents in current bulk request"))
.map(DocumentReindexer::convertToBulkRequestBody) // Assemble the bulk request body from the parts
.flatMap(bulkJson -> client.postBulkAsync(targetPath, bulkJson) // Send the request
.flatMap(bulkJson -> client.sendBulkRequest(indexName, bulkJson) // Send the request
.doOnSuccess(unused -> logger.debug("Batch succeeded"))
.doOnError(error -> logger.error("Batch failed", error))
.onErrorResume(e -> Mono.empty()) // Prevent the error from stopping the entire stream
Expand Down Expand Up @@ -53,7 +51,7 @@ private static String convertToBulkRequestBody(List<String> bulkSections) {

public static void refreshAllDocuments(ConnectionDetails targetConnection) throws Exception {
// Send the request
RestClient client = new RestClient(targetConnection);
client.get("_refresh", false);
OpenSearchClient client = new OpenSearchClient(targetConnection);
client.refresh();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ public class FileSystemSnapshotCreator extends SnapshotCreator {
private static final Logger logger = LogManager.getLogger(FileSystemSnapshotCreator.class);
private static final ObjectMapper mapper = new ObjectMapper();

private final ConnectionDetails connectionDetails;
private final OpenSearchClient client;
private final String snapshotName;
private final String snapshotRepoDirectoryPath;

public FileSystemSnapshotCreator(String snapshotName, ConnectionDetails connectionDetails, String snapshotRepoDirectoryPath) {
super(snapshotName, connectionDetails);
public FileSystemSnapshotCreator(String snapshotName, OpenSearchClient client, String snapshotRepoDirectoryPath) {
super(snapshotName, client);
this.snapshotName = snapshotName;
this.connectionDetails = connectionDetails;
this.client = client;
this.snapshotRepoDirectoryPath = snapshotRepoDirectoryPath;
}

Expand Down
127 changes: 127 additions & 0 deletions RFS/src/main/java/com/rfs/common/OpenSearchClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package com.rfs.common;

import java.net.HttpURLConnection;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.fasterxml.jackson.databind.node.ObjectNode;

import reactor.core.publisher.Mono;

public class OpenSearchClient {
private static final Logger logger = LogManager.getLogger(OpenSearchClient.class);

public static class BulkResponse extends RestClient.Response {
public BulkResponse(int responseCode, String responseBody, String responseMessage) {
super(responseCode, responseBody, responseMessage);
}

public boolean hasBadStatusCode() {
return !(code == HttpURLConnection.HTTP_OK || code == HttpURLConnection.HTTP_CREATED);
}

public boolean hasFailedOperations() {
return body.contains("\"errors\":true");
}

public String getFailureMessage() {
String failureMessage;
if (hasBadStatusCode()) {
failureMessage = "Bulk request failed. Status code: " + code + ", Response body: " + body;
} else {
failureMessage = "Bulk request succeeded, but some operations failed. Response body: " + body;
}

return failureMessage;
}
}

public final ConnectionDetails connectionDetails;
private final RestClient client;

public OpenSearchClient(ConnectionDetails connectionDetails) {
this.connectionDetails = connectionDetails;
this.client = new RestClient(connectionDetails);
}

/*
* Create a legacy template if it does not already exist; return true if created, false otherwise.
*/
public boolean createLegacyTemplateIdempotent(String templateName, ObjectNode settings){
String targetPath = "_template/" + templateName;
return createObjectIdempotent(targetPath, settings);
}

/*
* Create a component template if it does not already exist; return true if created, false otherwise.
*/
public boolean createComponentTemplateIdempotent(String templateName, ObjectNode settings){
String targetPath = "_component_template/" + templateName;
return createObjectIdempotent(targetPath, settings);
}

/*
* Create an index template if it does not already exist; return true if created, false otherwise.
*/
public boolean createIndexTemplateIdempotent(String templateName, ObjectNode settings){
String targetPath = "_index_template/" + templateName;
return createObjectIdempotent(targetPath, settings);
}

/*
* Create an index if it does not already exist; return true if created, false otherwise.
*/
public boolean createIndexIdempotent(String indexName, ObjectNode settings){
String targetPath = indexName;
return createObjectIdempotent(targetPath, settings);
}

private boolean createObjectIdempotent(String objectPath, ObjectNode settings){
RestClient.Response response = client.get(objectPath, true);
if (response.code == HttpURLConnection.HTTP_NOT_FOUND) {
client.put(objectPath, settings.toString(), false);
return true;
} else if (response.code == HttpURLConnection.HTTP_OK) {
logger.warn(objectPath + " already exists. Skipping creation.");
} else {
logger.warn("Could not confirm that " + objectPath + " does not already exist. Skipping creation.");
}
return false;
}

public RestClient.Response registerSnapshotRepo(String repoName, ObjectNode settings){
String targetPath = "_snapshot/" + repoName;
return client.put(targetPath, settings.toString(), false);
}

public RestClient.Response createSnapshot(String repoName, String snapshotName, ObjectNode settings){
String targetPath = "_snapshot/" + repoName + "/" + snapshotName;
return client.put(targetPath, settings.toString(), false);
}

public RestClient.Response getSnapshotStatus(String repoName, String snapshotName){
String targetPath = "_snapshot/" + repoName + "/" + snapshotName;
return client.get(targetPath, false);
}

public Mono<BulkResponse> sendBulkRequest(String indexName, String body) {
String targetPath = indexName + "/_bulk";

return client.postAsync(targetPath, body)
.map(response -> new BulkResponse(response.code, response.body, response.message))
.flatMap(responseDetails -> {
if (responseDetails.hasBadStatusCode() || responseDetails.hasFailedOperations()) {
logger.error(responseDetails.getFailureMessage());
return Mono.error(new RuntimeException(responseDetails.getFailureMessage()));
}
return Mono.just(responseDetails);
});
}

public RestClient.Response refresh() {
String targetPath = "_refresh";

return client.get(targetPath, false);
}
}
36 changes: 2 additions & 34 deletions RFS/src/main/java/com/rfs/common/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,6 @@ public Response(int responseCode, String responseBody, String responseMessage) {
}
}

public static class BulkResponse extends Response {
public BulkResponse(int responseCode, String responseBody, String responseMessage) {
super(responseCode, responseBody, responseMessage);
}

public boolean hasBadStatusCode() {
return !(code == 200 || code == 201);
}

public boolean hasFailedOperations() {
return body.contains("\"errors\":true");
}

public String getFailureMessage() {
String failureMessage;
if (hasBadStatusCode()) {
failureMessage = "Bulk request failed. Status code: " + code + ", Response body: " + body;
} else {
failureMessage = "Bulk request succeeded, but some operations failed. Response body: " + body;
}

return failureMessage;
}
}

public final ConnectionDetails connectionDetails;
private final HttpClient client;

Expand Down Expand Up @@ -79,19 +54,12 @@ public Response get(String path, boolean quietLogging) {
return getAsync(path, quietLogging).block();
}

public Mono<BulkResponse> postBulkAsync(String path, String body) {
public Mono<Response> postAsync(String path, String body) {
return client.post()
.uri("/" + path)
.send(ByteBufMono.fromString(Mono.just(body)))
.responseSingle((response, bytes) -> bytes.asString()
.map(b -> new BulkResponse(response.status().code(), b, response.status().reasonPhrase()))
.flatMap(responseDetails -> {
if (responseDetails.hasBadStatusCode() || responseDetails.hasFailedOperations()) {
logger.error(responseDetails.getFailureMessage());
return Mono.error(new RuntimeException(responseDetails.getFailureMessage()));
}
return Mono.just(responseDetails);
}));
.map(b -> new Response(response.status().code(), b, response.status().reasonPhrase())));
}

public Mono<Response> putAsync(String path, String body, boolean quietLogging) {
Expand Down
8 changes: 4 additions & 4 deletions RFS/src/main/java/com/rfs/common/S3SnapshotCreator.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ public class S3SnapshotCreator extends SnapshotCreator {
private static final Logger logger = LogManager.getLogger(S3SnapshotCreator.class);
private static final ObjectMapper mapper = new ObjectMapper();

private final ConnectionDetails connectionDetails;
private final OpenSearchClient client;
private final String snapshotName;
private final String s3Uri;
private final String s3Region;

public S3SnapshotCreator(String snapshotName, ConnectionDetails connectionDetails, String s3Uri, String s3Region) {
super(snapshotName, connectionDetails);
public S3SnapshotCreator(String snapshotName, OpenSearchClient client, String s3Uri, String s3Region) {
super(snapshotName, client);
this.snapshotName = snapshotName;
this.connectionDetails = connectionDetails;
this.client = client;
this.s3Uri = s3Uri;
this.s3Region = s3Region;
}
Expand Down
30 changes: 8 additions & 22 deletions RFS/src/main/java/com/rfs/common/SnapshotCreator.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ public abstract class SnapshotCreator {
private static final Logger logger = LogManager.getLogger(SnapshotCreator.class);
private static final ObjectMapper mapper = new ObjectMapper();

private final ConnectionDetails connectionDetails;
private final OpenSearchClient client;
@Getter
private final String snapshotName;

public SnapshotCreator(String snapshotName, ConnectionDetails connectionDetails) {
public SnapshotCreator(String snapshotName, OpenSearchClient client) {
this.snapshotName = snapshotName;
this.connectionDetails = connectionDetails;
this.client = client;
}

abstract ObjectNode getRequestBodyForRegisterRepo();
Expand All @@ -29,15 +29,10 @@ public String getRepoName() {
}

public void registerRepo() throws Exception {
// Assemble the REST path
String targetName = "_snapshot/" + getRepoName();

ObjectNode body = getRequestBodyForRegisterRepo();
ObjectNode settings = getRequestBodyForRegisterRepo();

// Register the repo; it's fine if it already exists
RestClient client = new RestClient(connectionDetails);
String bodyString = body.toString();
RestClient.Response response = client.put(targetName, bodyString, false);
RestClient.Response response = client.registerSnapshotRepo(getRepoName(), settings);
if (response.code == HttpURLConnection.HTTP_OK || response.code == HttpURLConnection.HTTP_CREATED) {
logger.info("Snapshot repo registration successful");
} else {
Expand All @@ -47,19 +42,14 @@ public void registerRepo() throws Exception {
}

public void createSnapshot() throws Exception {
// Assemble the REST path
String targetName = "_snapshot/" + getRepoName() + "/" + getSnapshotName();

// Assemble the request body
// Assemble the settings
ObjectNode body = mapper.createObjectNode();
body.put("indices", "_all");
body.put("ignore_unavailable", true);
body.put("include_global_state", true);

// Register the repo; idempotent operation
RestClient client = new RestClient(connectionDetails);
String bodyString = body.toString();
RestClient.Response response = client.put(targetName, bodyString, false);
RestClient.Response response = client.createSnapshot(getRepoName(), getSnapshotName(), body);
if (response.code == HttpURLConnection.HTTP_OK || response.code == HttpURLConnection.HTTP_CREATED) {
logger.info("Snapshot " + getSnapshotName() + " creation initiated");
} else {
Expand All @@ -69,12 +59,8 @@ public void createSnapshot() throws Exception {
}

public boolean isSnapshotFinished() throws Exception {
// Assemble the REST path
String targetName = "_snapshot/" + getRepoName() + "/" + getSnapshotName();

// Check if the snapshot has finished
RestClient client = new RestClient(connectionDetails);
RestClient.Response response = client.get(targetName, false);
RestClient.Response response = client.getSnapshotStatus(getRepoName(), getSnapshotName());
if (response.code == HttpURLConnection.HTTP_NOT_FOUND) {
logger.error("Snapshot " + getSnapshotName() + " does not exist");
throw new SnapshotDoesNotExist(getSnapshotName());
Expand Down
Loading

0 comments on commit c3e13a1

Please sign in to comment.