diff --git a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java index 797866083..ba2120d10 100644 --- a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java +++ b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java @@ -182,9 +182,10 @@ public static void main(String[] args) throws InterruptedException { // ========================================================================================================== logger.info("=================================================================="); logger.info("Attempting to create the snapshot..."); + OpenSearchClient sourceClient = new OpenSearchClient(sourceConnection); SnapshotCreator snapshotCreator = repo instanceof S3Repo - ? new S3SnapshotCreator(snapshotName, sourceConnection, s3RepoUri, s3Region) - : new FileSystemSnapshotCreator(snapshotName, sourceConnection, snapshotLocalRepoDirPath.toString()); + ? new S3SnapshotCreator(snapshotName, sourceClient, s3RepoUri, s3Region) + : new FileSystemSnapshotCreator(snapshotName, sourceClient, snapshotLocalRepoDirPath.toString()); snapshotCreator.registerRepo(); snapshotCreator.createSnapshot(); while (!snapshotCreator.isSnapshotFinished()) { @@ -266,14 +267,15 @@ public static void main(String[] args) throws InterruptedException { logger.info("=================================================================="); logger.info("Attempting to recreate the Global Metadata..."); + OpenSearchClient targetClient = new OpenSearchClient(targetConnection); if (sourceVersion == ClusterVersion.ES_6_8) { ObjectNode root = globalMetadata.toObjectNode(); ObjectNode transformedRoot = transformer.transformGlobalMetadata(root); - GlobalMetadataCreator_OS_2_11.create(transformedRoot, targetConnection, Collections.emptyList(), templateWhitelist); + GlobalMetadataCreator_OS_2_11.create(transformedRoot, targetClient, Collections.emptyList(), templateWhitelist); } else if (sourceVersion == ClusterVersion.ES_7_10) { ObjectNode root = globalMetadata.toObjectNode(); ObjectNode transformedRoot = transformer.transformGlobalMetadata(root); - GlobalMetadataCreator_OS_2_11.create(transformedRoot, targetConnection, componentTemplateWhitelist, templateWhitelist); + GlobalMetadataCreator_OS_2_11.create(transformedRoot, targetClient, componentTemplateWhitelist, templateWhitelist); } } @@ -301,6 +303,7 @@ public static void main(String[] args) throws InterruptedException { // ========================================================================================================== logger.info("=================================================================="); logger.info("Attempting to recreate the indices..."); + OpenSearchClient targetClient = new OpenSearchClient(targetConnection); for (IndexMetadata.Data indexMetadata : indexMetadatas) { String reindexName = indexMetadata.getName() + indexSuffix; logger.info("Recreating index " + indexMetadata.getName() + " as " + reindexName + " on target..."); @@ -308,7 +311,7 @@ public static void main(String[] args) throws InterruptedException { ObjectNode root = indexMetadata.toObjectNode(); ObjectNode transformedRoot = transformer.transformIndexMetadata(root); IndexMetadataData_OS_2_11 indexMetadataOS211 = new IndexMetadataData_OS_2_11(transformedRoot, indexMetadata.getId(), reindexName); - IndexCreator_OS_2_11.create(reindexName, indexMetadataOS211, targetConnection); + IndexCreator_OS_2_11.create(reindexName, indexMetadataOS211, targetClient); } } diff --git a/RFS/src/main/java/com/rfs/common/DocumentReindexer.java b/RFS/src/main/java/com/rfs/common/DocumentReindexer.java index 11d10fb62..f4c64e293 100644 --- a/RFS/src/main/java/com/rfs/common/DocumentReindexer.java +++ b/RFS/src/main/java/com/rfs/common/DocumentReindexer.java @@ -16,16 +16,14 @@ public class DocumentReindexer { private static final int MAX_BATCH_SIZE = 1000; // Arbitrarily chosen public static Mono reindex(String indexName, Flux documentStream, ConnectionDetails targetConnection) throws Exception { - RestClient client = new RestClient(targetConnection); - - String targetPath = indexName + "/_bulk"; + OpenSearchClient client = new OpenSearchClient(targetConnection); return documentStream .map(DocumentReindexer::convertDocumentToBulkSection) // Convert each Document to part of a bulk operation .buffer(MAX_BATCH_SIZE) // Collect until you hit the batch size .doOnNext(bulk -> logger.info(bulk.size() + " documents in current bulk request")) .map(DocumentReindexer::convertToBulkRequestBody) // Assemble the bulk request body from the parts - .flatMap(bulkJson -> client.postBulkAsync(targetPath, bulkJson) // Send the request + .flatMap(bulkJson -> client.sendBulkRequest(indexName, bulkJson) // Send the request .doOnSuccess(unused -> logger.debug("Batch succeeded")) .doOnError(error -> logger.error("Batch failed", error)) .onErrorResume(e -> Mono.empty()) // Prevent the error from stopping the entire stream @@ -53,7 +51,7 @@ private static String convertToBulkRequestBody(List bulkSections) { public static void refreshAllDocuments(ConnectionDetails targetConnection) throws Exception { // Send the request - RestClient client = new RestClient(targetConnection); - client.get("_refresh", false); + OpenSearchClient client = new OpenSearchClient(targetConnection); + client.refresh(); } } diff --git a/RFS/src/main/java/com/rfs/common/FileSystemSnapshotCreator.java b/RFS/src/main/java/com/rfs/common/FileSystemSnapshotCreator.java index d8a947939..a4f122b47 100644 --- a/RFS/src/main/java/com/rfs/common/FileSystemSnapshotCreator.java +++ b/RFS/src/main/java/com/rfs/common/FileSystemSnapshotCreator.java @@ -9,14 +9,14 @@ public class FileSystemSnapshotCreator extends SnapshotCreator { private static final Logger logger = LogManager.getLogger(FileSystemSnapshotCreator.class); private static final ObjectMapper mapper = new ObjectMapper(); - private final ConnectionDetails connectionDetails; + private final OpenSearchClient client; private final String snapshotName; private final String snapshotRepoDirectoryPath; - public FileSystemSnapshotCreator(String snapshotName, ConnectionDetails connectionDetails, String snapshotRepoDirectoryPath) { - super(snapshotName, connectionDetails); + public FileSystemSnapshotCreator(String snapshotName, OpenSearchClient client, String snapshotRepoDirectoryPath) { + super(snapshotName, client); this.snapshotName = snapshotName; - this.connectionDetails = connectionDetails; + this.client = client; this.snapshotRepoDirectoryPath = snapshotRepoDirectoryPath; } diff --git a/RFS/src/main/java/com/rfs/common/OpenSearchClient.java b/RFS/src/main/java/com/rfs/common/OpenSearchClient.java new file mode 100644 index 000000000..a21bd9057 --- /dev/null +++ b/RFS/src/main/java/com/rfs/common/OpenSearchClient.java @@ -0,0 +1,127 @@ +package com.rfs.common; + +import java.net.HttpURLConnection; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.fasterxml.jackson.databind.node.ObjectNode; + +import reactor.core.publisher.Mono; + +public class OpenSearchClient { + private static final Logger logger = LogManager.getLogger(OpenSearchClient.class); + + public static class BulkResponse extends RestClient.Response { + public BulkResponse(int responseCode, String responseBody, String responseMessage) { + super(responseCode, responseBody, responseMessage); + } + + public boolean hasBadStatusCode() { + return !(code == HttpURLConnection.HTTP_OK || code == HttpURLConnection.HTTP_CREATED); + } + + public boolean hasFailedOperations() { + return body.contains("\"errors\":true"); + } + + public String getFailureMessage() { + String failureMessage; + if (hasBadStatusCode()) { + failureMessage = "Bulk request failed. Status code: " + code + ", Response body: " + body; + } else { + failureMessage = "Bulk request succeeded, but some operations failed. Response body: " + body; + } + + return failureMessage; + } + } + + public final ConnectionDetails connectionDetails; + private final RestClient client; + + public OpenSearchClient(ConnectionDetails connectionDetails) { + this.connectionDetails = connectionDetails; + this.client = new RestClient(connectionDetails); + } + + /* + * Create a legacy template if it does not already exist; return true if created, false otherwise. + */ + public boolean createLegacyTemplateIdempotent(String templateName, ObjectNode settings){ + String targetPath = "_template/" + templateName; + return createObjectIdempotent(targetPath, settings); + } + + /* + * Create a component template if it does not already exist; return true if created, false otherwise. + */ + public boolean createComponentTemplateIdempotent(String templateName, ObjectNode settings){ + String targetPath = "_component_template/" + templateName; + return createObjectIdempotent(targetPath, settings); + } + + /* + * Create an index template if it does not already exist; return true if created, false otherwise. + */ + public boolean createIndexTemplateIdempotent(String templateName, ObjectNode settings){ + String targetPath = "_index_template/" + templateName; + return createObjectIdempotent(targetPath, settings); + } + + /* + * Create an index if it does not already exist; return true if created, false otherwise. + */ + public boolean createIndexIdempotent(String indexName, ObjectNode settings){ + String targetPath = indexName; + return createObjectIdempotent(targetPath, settings); + } + + private boolean createObjectIdempotent(String objectPath, ObjectNode settings){ + RestClient.Response response = client.get(objectPath, true); + if (response.code == HttpURLConnection.HTTP_NOT_FOUND) { + client.put(objectPath, settings.toString(), false); + return true; + } else if (response.code == HttpURLConnection.HTTP_OK) { + logger.warn(objectPath + " already exists. Skipping creation."); + } else { + logger.warn("Could not confirm that " + objectPath + " does not already exist. Skipping creation."); + } + return false; + } + + public RestClient.Response registerSnapshotRepo(String repoName, ObjectNode settings){ + String targetPath = "_snapshot/" + repoName; + return client.put(targetPath, settings.toString(), false); + } + + public RestClient.Response createSnapshot(String repoName, String snapshotName, ObjectNode settings){ + String targetPath = "_snapshot/" + repoName + "/" + snapshotName; + return client.put(targetPath, settings.toString(), false); + } + + public RestClient.Response getSnapshotStatus(String repoName, String snapshotName){ + String targetPath = "_snapshot/" + repoName + "/" + snapshotName; + return client.get(targetPath, false); + } + + public Mono sendBulkRequest(String indexName, String body) { + String targetPath = indexName + "/_bulk"; + + return client.postAsync(targetPath, body) + .map(response -> new BulkResponse(response.code, response.body, response.message)) + .flatMap(responseDetails -> { + if (responseDetails.hasBadStatusCode() || responseDetails.hasFailedOperations()) { + logger.error(responseDetails.getFailureMessage()); + return Mono.error(new RuntimeException(responseDetails.getFailureMessage())); + } + return Mono.just(responseDetails); + }); + } + + public RestClient.Response refresh() { + String targetPath = "_refresh"; + + return client.get(targetPath, false); + } +} diff --git a/RFS/src/main/java/com/rfs/common/RestClient.java b/RFS/src/main/java/com/rfs/common/RestClient.java index 98c32826a..17fc54860 100644 --- a/RFS/src/main/java/com/rfs/common/RestClient.java +++ b/RFS/src/main/java/com/rfs/common/RestClient.java @@ -24,31 +24,6 @@ public Response(int responseCode, String responseBody, String responseMessage) { } } - public static class BulkResponse extends Response { - public BulkResponse(int responseCode, String responseBody, String responseMessage) { - super(responseCode, responseBody, responseMessage); - } - - public boolean hasBadStatusCode() { - return !(code == 200 || code == 201); - } - - public boolean hasFailedOperations() { - return body.contains("\"errors\":true"); - } - - public String getFailureMessage() { - String failureMessage; - if (hasBadStatusCode()) { - failureMessage = "Bulk request failed. Status code: " + code + ", Response body: " + body; - } else { - failureMessage = "Bulk request succeeded, but some operations failed. Response body: " + body; - } - - return failureMessage; - } - } - public final ConnectionDetails connectionDetails; private final HttpClient client; @@ -79,19 +54,12 @@ public Response get(String path, boolean quietLogging) { return getAsync(path, quietLogging).block(); } - public Mono postBulkAsync(String path, String body) { + public Mono postAsync(String path, String body) { return client.post() .uri("/" + path) .send(ByteBufMono.fromString(Mono.just(body))) .responseSingle((response, bytes) -> bytes.asString() - .map(b -> new BulkResponse(response.status().code(), b, response.status().reasonPhrase())) - .flatMap(responseDetails -> { - if (responseDetails.hasBadStatusCode() || responseDetails.hasFailedOperations()) { - logger.error(responseDetails.getFailureMessage()); - return Mono.error(new RuntimeException(responseDetails.getFailureMessage())); - } - return Mono.just(responseDetails); - })); + .map(b -> new Response(response.status().code(), b, response.status().reasonPhrase()))); } public Mono putAsync(String path, String body, boolean quietLogging) { diff --git a/RFS/src/main/java/com/rfs/common/S3SnapshotCreator.java b/RFS/src/main/java/com/rfs/common/S3SnapshotCreator.java index 150b3430e..7832f05b5 100644 --- a/RFS/src/main/java/com/rfs/common/S3SnapshotCreator.java +++ b/RFS/src/main/java/com/rfs/common/S3SnapshotCreator.java @@ -10,15 +10,15 @@ public class S3SnapshotCreator extends SnapshotCreator { private static final Logger logger = LogManager.getLogger(S3SnapshotCreator.class); private static final ObjectMapper mapper = new ObjectMapper(); - private final ConnectionDetails connectionDetails; + private final OpenSearchClient client; private final String snapshotName; private final String s3Uri; private final String s3Region; - public S3SnapshotCreator(String snapshotName, ConnectionDetails connectionDetails, String s3Uri, String s3Region) { - super(snapshotName, connectionDetails); + public S3SnapshotCreator(String snapshotName, OpenSearchClient client, String s3Uri, String s3Region) { + super(snapshotName, client); this.snapshotName = snapshotName; - this.connectionDetails = connectionDetails; + this.client = client; this.s3Uri = s3Uri; this.s3Region = s3Region; } diff --git a/RFS/src/main/java/com/rfs/common/SnapshotCreator.java b/RFS/src/main/java/com/rfs/common/SnapshotCreator.java index f999744ba..5ec9bbb95 100644 --- a/RFS/src/main/java/com/rfs/common/SnapshotCreator.java +++ b/RFS/src/main/java/com/rfs/common/SnapshotCreator.java @@ -13,13 +13,13 @@ public abstract class SnapshotCreator { private static final Logger logger = LogManager.getLogger(SnapshotCreator.class); private static final ObjectMapper mapper = new ObjectMapper(); - private final ConnectionDetails connectionDetails; + private final OpenSearchClient client; @Getter private final String snapshotName; - public SnapshotCreator(String snapshotName, ConnectionDetails connectionDetails) { + public SnapshotCreator(String snapshotName, OpenSearchClient client) { this.snapshotName = snapshotName; - this.connectionDetails = connectionDetails; + this.client = client; } abstract ObjectNode getRequestBodyForRegisterRepo(); @@ -29,15 +29,10 @@ public String getRepoName() { } public void registerRepo() throws Exception { - // Assemble the REST path - String targetName = "_snapshot/" + getRepoName(); - - ObjectNode body = getRequestBodyForRegisterRepo(); + ObjectNode settings = getRequestBodyForRegisterRepo(); // Register the repo; it's fine if it already exists - RestClient client = new RestClient(connectionDetails); - String bodyString = body.toString(); - RestClient.Response response = client.put(targetName, bodyString, false); + RestClient.Response response = client.registerSnapshotRepo(getRepoName(), settings); if (response.code == HttpURLConnection.HTTP_OK || response.code == HttpURLConnection.HTTP_CREATED) { logger.info("Snapshot repo registration successful"); } else { @@ -47,19 +42,14 @@ public void registerRepo() throws Exception { } public void createSnapshot() throws Exception { - // Assemble the REST path - String targetName = "_snapshot/" + getRepoName() + "/" + getSnapshotName(); - - // Assemble the request body + // Assemble the settings ObjectNode body = mapper.createObjectNode(); body.put("indices", "_all"); body.put("ignore_unavailable", true); body.put("include_global_state", true); // Register the repo; idempotent operation - RestClient client = new RestClient(connectionDetails); - String bodyString = body.toString(); - RestClient.Response response = client.put(targetName, bodyString, false); + RestClient.Response response = client.createSnapshot(getRepoName(), getSnapshotName(), body); if (response.code == HttpURLConnection.HTTP_OK || response.code == HttpURLConnection.HTTP_CREATED) { logger.info("Snapshot " + getSnapshotName() + " creation initiated"); } else { @@ -69,12 +59,8 @@ public void createSnapshot() throws Exception { } public boolean isSnapshotFinished() throws Exception { - // Assemble the REST path - String targetName = "_snapshot/" + getRepoName() + "/" + getSnapshotName(); - // Check if the snapshot has finished - RestClient client = new RestClient(connectionDetails); - RestClient.Response response = client.get(targetName, false); + RestClient.Response response = client.getSnapshotStatus(getRepoName(), getSnapshotName()); if (response.code == HttpURLConnection.HTTP_NOT_FOUND) { logger.error("Snapshot " + getSnapshotName() + " does not exist"); throw new SnapshotDoesNotExist(getSnapshotName()); diff --git a/RFS/src/main/java/com/rfs/version_os_2_11/GlobalMetadataCreator_OS_2_11.java b/RFS/src/main/java/com/rfs/version_os_2_11/GlobalMetadataCreator_OS_2_11.java index 9216a4f60..4ba48c4d7 100644 --- a/RFS/src/main/java/com/rfs/version_os_2_11/GlobalMetadataCreator_OS_2_11.java +++ b/RFS/src/main/java/com/rfs/version_os_2_11/GlobalMetadataCreator_OS_2_11.java @@ -1,6 +1,5 @@ package com.rfs.version_os_2_11; -import java.net.HttpURLConnection; import java.util.ArrayList; import java.util.List; @@ -8,22 +7,21 @@ import org.apache.logging.log4j.Logger; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.rfs.common.ConnectionDetails; -import com.rfs.common.RestClient; +import com.rfs.common.OpenSearchClient; public class GlobalMetadataCreator_OS_2_11 { private static final Logger logger = LogManager.getLogger(GlobalMetadataCreator_OS_2_11.class); - public static void create(ObjectNode root, ConnectionDetails connectionDetails, List componentTemplateWhitelist, List indexTemplateWhitelist) throws Exception { + public static void create(ObjectNode root, OpenSearchClient client, List componentTemplateWhitelist, List indexTemplateWhitelist) throws Exception { logger.info("Setting Global Metadata"); GlobalMetadataData_OS_2_11 globalMetadata = new GlobalMetadataData_OS_2_11(root); - createTemplates(globalMetadata, connectionDetails, indexTemplateWhitelist); - createComponentTemplates(globalMetadata, connectionDetails, componentTemplateWhitelist); - createIndexTemplates(globalMetadata, connectionDetails, indexTemplateWhitelist); + createTemplates(globalMetadata, client, indexTemplateWhitelist); + createComponentTemplates(globalMetadata, client, componentTemplateWhitelist); + createIndexTemplates(globalMetadata, client, indexTemplateWhitelist); } - public static void createTemplates(GlobalMetadataData_OS_2_11 globalMetadata, ConnectionDetails connectionDetails, List indexTemplateWhitelist) throws Exception { + public static void createTemplates(GlobalMetadataData_OS_2_11 globalMetadata, OpenSearchClient client, List indexTemplateWhitelist) throws Exception { logger.info("Setting Legacy Templates"); ObjectNode templates = globalMetadata.getTemplates(); @@ -41,8 +39,7 @@ public static void createTemplates(GlobalMetadataData_OS_2_11 globalMetadata, Co logger.info("Setting Legacy Template: " + templateName); ObjectNode settings = (ObjectNode) globalMetadata.getTemplates().get(templateName); - String path = "_template/" + templateName; - createEntity(templateName, settings, connectionDetails, path); + client.createLegacyTemplateIdempotent(templateName, settings); } } else { // Get the template names @@ -53,13 +50,12 @@ public static void createTemplates(GlobalMetadataData_OS_2_11 globalMetadata, Co for (String templateName : templateKeys) { logger.info("Setting Legacy Template: " + templateName); ObjectNode settings = (ObjectNode) templates.get(templateName); - String path = "_template/" + templateName; - createEntity(templateName, settings, connectionDetails, path); + client.createLegacyTemplateIdempotent(templateName, settings); } } } - public static void createComponentTemplates(GlobalMetadataData_OS_2_11 globalMetadata, ConnectionDetails connectionDetails, List indexTemplateWhitelist) throws Exception { + public static void createComponentTemplates(GlobalMetadataData_OS_2_11 globalMetadata, OpenSearchClient client, List indexTemplateWhitelist) throws Exception { logger.info("Setting Component Templates"); ObjectNode templates = globalMetadata.getComponentTemplates(); @@ -77,8 +73,7 @@ public static void createComponentTemplates(GlobalMetadataData_OS_2_11 globalMet logger.info("Setting Component Template: " + templateName); ObjectNode settings = (ObjectNode) templates.get(templateName); - String path = "_component_template/" + templateName; - createEntity(templateName, settings, connectionDetails, path); + client.createComponentTemplateIdempotent(templateName, settings); } } else { // Get the template names @@ -89,13 +84,12 @@ public static void createComponentTemplates(GlobalMetadataData_OS_2_11 globalMet for (String templateName : templateKeys) { logger.info("Setting Component Template: " + templateName); ObjectNode settings = (ObjectNode) templates.get(templateName); - String path = "_component_template/" + templateName; - createEntity(templateName, settings, connectionDetails, path); + client.createComponentTemplateIdempotent(templateName, settings); } } } - public static void createIndexTemplates(GlobalMetadataData_OS_2_11 globalMetadata, ConnectionDetails connectionDetails, List indexTemplateWhitelist) throws Exception { + public static void createIndexTemplates(GlobalMetadataData_OS_2_11 globalMetadata, OpenSearchClient client, List indexTemplateWhitelist) throws Exception { logger.info("Setting Index Templates"); ObjectNode templates = globalMetadata.getIndexTemplates(); @@ -113,8 +107,7 @@ public static void createIndexTemplates(GlobalMetadataData_OS_2_11 globalMetadat logger.info("Setting Index Template: " + templateName); ObjectNode settings = (ObjectNode) globalMetadata.getIndexTemplates().get(templateName); - String path = "_index_template/" + templateName; - createEntity(templateName, settings, connectionDetails, path); + client.createIndexTemplateIdempotent(templateName, settings); } } else { // Get the template names @@ -125,26 +118,8 @@ public static void createIndexTemplates(GlobalMetadataData_OS_2_11 globalMetadat for (String templateName : templateKeys) { logger.info("Setting Index Template: " + templateName); ObjectNode settings = (ObjectNode) templates.get(templateName); - String path = "_index_template/" + templateName; - createEntity(templateName, settings, connectionDetails, path); + client.createIndexTemplateIdempotent(templateName, settings); } } } - - private static void createEntity(String entityName, ObjectNode settings, ConnectionDetails connectionDetails, String path) throws Exception { - // Assemble the request details - String body = settings.toString(); - - // Confirm the index doesn't already exist, then create it - RestClient client = new RestClient(connectionDetails); - RestClient.Response response = client.get(path, true); - if (response.code == HttpURLConnection.HTTP_NOT_FOUND) { - String bodyString = body.toString(); - client.put(path, bodyString, false); - } else if (response.code == HttpURLConnection.HTTP_OK) { - logger.warn(entityName + " already exists. Skipping creation."); - } else { - logger.warn("Could not confirm that " + entityName + " does not already exist. Skipping creation."); - } - } } diff --git a/RFS/src/main/java/com/rfs/version_os_2_11/IndexCreator_OS_2_11.java b/RFS/src/main/java/com/rfs/version_os_2_11/IndexCreator_OS_2_11.java index c3de7b5ef..58661aa66 100644 --- a/RFS/src/main/java/com/rfs/version_os_2_11/IndexCreator_OS_2_11.java +++ b/RFS/src/main/java/com/rfs/version_os_2_11/IndexCreator_OS_2_11.java @@ -1,21 +1,18 @@ package com.rfs.version_os_2_11; -import java.net.HttpURLConnection; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.rfs.common.ConnectionDetails; import com.rfs.common.IndexMetadata; -import com.rfs.common.RestClient; +import com.rfs.common.OpenSearchClient; public class IndexCreator_OS_2_11 { private static final Logger logger = LogManager.getLogger(IndexCreator_OS_2_11.class); private static final ObjectMapper mapper = new ObjectMapper(); - public static void create(String targetName, IndexMetadata.Data indexMetadata, ConnectionDetails connectionDetails) throws Exception { + public static void create(String indexName, IndexMetadata.Data indexMetadata, OpenSearchClient client) throws Exception { // Remove some settings which will cause errors if you try to pass them to the API ObjectNode settings = indexMetadata.getSettings(); @@ -30,16 +27,7 @@ public static void create(String targetName, IndexMetadata.Data indexMetadata, C body.set("mappings", indexMetadata.getMappings()); body.set("settings", settings); - // Confirm the index doesn't already exist, then create it - RestClient client = new RestClient(connectionDetails); - RestClient.Response response = client.get(targetName, true); - if (response.code == HttpURLConnection.HTTP_NOT_FOUND) { - String bodyString = body.toString(); - client.put(targetName, bodyString, false); - } else if (response.code == HttpURLConnection.HTTP_OK) { - logger.warn("Index " + targetName + " already exists. Skipping creation."); - } else { - logger.warn("Could not confirm that index " + targetName + " does not already exist. Skipping creation."); - } + // Idempotently create the index + client.createIndexIdempotent(indexName, body); } }