Skip to content

Commit

Permalink
Merge branch 'main' into RfsInstrumentation + fixup some unit tests t…
Browse files Browse the repository at this point in the history
…hat were not context aware.

Signed-off-by: Greg Schohn <greg.schohn@gmail.com>

# Conflicts:
#	RFS/build.gradle
#	RFS/src/main/java/com/rfs/ReindexFromSnapshot.java
#	RFS/src/main/java/com/rfs/RunRfsWorker.java
#	RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java
#	RFS/src/main/java/com/rfs/common/DocumentReindexer.java
#	RFS/src/main/java/com/rfs/common/OpenSearchClient.java
#	RFS/src/main/java/com/rfs/common/SnapshotCreator.java
  • Loading branch information
gregschohn committed May 29, 2024
2 parents 4a0ddf1 + fab14e2 commit 9e11ff0
Show file tree
Hide file tree
Showing 29 changed files with 1,219 additions and 414 deletions.
17 changes: 11 additions & 6 deletions .github/workflows/gradle-build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ jobs:
gradle-home-cache-cleanup: true

- name: Run Gradle Build
run: ../gradlew build -x test --scan --stacktrace
working-directory: TrafficCapture
run: ./gradlew build -x test --scan --stacktrace
env:
OS_MIGRATIONS_GRADLE_SCAN_TOS_AGREE_AND_ENABLED: ''

- name: Run Tests with Coverage
run: ../gradlew test jacocoTestReport --scan --stacktrace
working-directory: TrafficCapture
run: ./gradlew test jacocoTestReport --scan --stacktrace
env:
OS_MIGRATIONS_GRADLE_SCAN_TOS_AGREE_AND_ENABLED: ''

Expand All @@ -40,11 +38,18 @@ jobs:
with:
name: traffic-capture-test-reports
path: |
./TrafficCapture/*/build/reports/
./TrafficCapture*/*/build/reports/
- uses: actions/upload-artifact@v4
if: always()
with:
name: RFS-test-reports
path: |
./RFS*/*/build/reports/
- name: Upload to Codecov
uses: codecov/codecov-action@v4
with:
files: "TrafficCapture/**/jacocoTestReport.xml"
files: "**/jacocoTestReport.xml"
flags: unittests
fail_ci_if_error: false
4 changes: 2 additions & 2 deletions .github/workflows/release-drafter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ jobs:
# Preface Traffic Capture version with 0. to signal interface immaturity
run: |
wget https://github.com/opensearch-project/opensearch-migrations/archive/refs/tags/${{ steps.get_data.outputs.version }}.tar.gz -O artifacts.tar.gz
(cd TrafficCapture && ../gradlew publishMavenJavaPublicationToMavenRepository -Dbuild.snapshot=false -Dbuild.version=0.${{ steps.get_data.outputs.version }} && tar -C build -cvf traffic-capture-artifacts.tar.gz repository)
./gradlew publishMavenJavaPublicationToMavenRepository -Dbuild.snapshot=false -Dbuild.version=0.${{ steps.get_data.outputs.version }} && tar -C build -cvf traffic-capture-artifacts.tar.gz repository
- name: Draft a release
uses: softprops/action-gh-release@v2
with:
draft: true
generate_release_notes: true
files: |
artifacts.tar.gz
TrafficCapture/traffic-capture-artifacts.tar.gz
traffic-capture-artifacts.tar.gz
60 changes: 36 additions & 24 deletions RFS/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,41 +33,53 @@ ext {
}

dependencies {
implementation project(":commonDependencyVersionConstraints")

implementation project(':coreUtilities')

implementation 'com.beust:jcommander:1.81'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.16.2'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.16.2'
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.16.2'
implementation 'com.fasterxml.jackson.core:jackson-core:2.16.2'
implementation 'io.netty:netty-codec-http:4.1.108.Final'
implementation 'org.apache.logging.log4j:log4j-api:2.23.1'
implementation 'org.apache.logging.log4j:log4j-core:2.23.1'
implementation 'org.apache.lucene:lucene-core:8.11.3'
implementation 'org.apache.lucene:lucene-analyzers-common:8.11.3'
implementation 'org.apache.lucene:lucene-backward-codecs:8.11.3'
implementation group: 'com.beust', name: 'jcommander'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-smile'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-annotations'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core'
implementation group: 'io.netty', name: 'netty-codec-http'
implementation group: 'org.apache.logging.log4j', name: 'log4j-api'
implementation group: 'org.apache.logging.log4j', name: 'log4j-core'
implementation group: 'org.apache.lucene', name: 'lucene-core'
implementation group: 'org.apache.lucene', name: 'lucene-analyzers-common'
implementation group: 'org.apache.lucene', name: 'lucene-backward-codecs'

implementation platform('io.projectreactor:reactor-bom:2023.0.5')
implementation 'io.projectreactor.netty:reactor-netty-core'
implementation 'io.projectreactor.netty:reactor-netty-http'

implementation platform("software.amazon.awssdk:bom:$awsSdkVersion")
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:s3-transfer-manager'
implementation 'software.amazon.awssdk.crt:aws-crt:0.29.18'
implementation group: 'software.amazon.awssdk', name: 's3'
implementation group: 'software.amazon.awssdk', name: 's3-transfer-manager'

testImplementation 'com.github.docker-java:docker-java-core:3.3.6'
testImplementation 'com.github.docker-java:docker-java-transport-httpclient5:3.3.6'

testImplementation group: 'io.projectreactor', name: 'reactor-test'
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-core'
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api'
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-params'
testImplementation group: 'org.mockito', name: 'mockito-core'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter'
testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine'

implementation 'com.github.docker-java:docker-java-core:3.3.6'
implementation 'com.github.docker-java:docker-java-transport-httpclient5:3.3.6'

// Integration tests
testImplementation testFixtures(project(path: ':testHelperFixtures'))
testImplementation testFixtures(project(path: ':coreUtilities'))
testImplementation 'io.projectreactor:reactor-test:3.6.5'
testImplementation 'org.apache.logging.log4j:log4j-core:2.23.1'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.2'
testImplementation 'org.junit.jupiter:junit-jupiter-params:5.10.2'
testImplementation 'org.mockito:mockito-core:5.11.0'
testImplementation 'org.mockito:mockito-junit-jupiter:5.11.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.10.2'
testImplementation group: 'org.testcontainers', name: 'testcontainers'
testImplementation group: 'org.hamcrest', name: 'hamcrest'
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api'
testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine'
// why are these under implementation with the above comment for integ tests?
implementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-annotations'
}

application {
Expand Down
6 changes: 3 additions & 3 deletions RFS/src/main/java/com/rfs/ReindexFromSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public static void main(String[] args) throws InterruptedException {
new CompositeContextTracker(new ActiveContextTracker(), new ActiveContextTrackerByActivityType()));


ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass);
ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass);
ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass);

// Sanity checks
Expand Down Expand Up @@ -312,13 +312,13 @@ public static void main(String[] args) throws InterruptedException {
}
logger.info("Index Metadata read successfully");

OpenSearchClient targetClient = new OpenSearchClient(targetConnection);
if ((movementType == MovementType.EVERYTHING) || (movementType == MovementType.METADATA)){
// ==========================================================================================================
// Recreate the Indices
// ==========================================================================================================
logger.info("==================================================================");
logger.info("Attempting to recreate the indices...");
OpenSearchClient targetClient = new OpenSearchClient(targetConnection);
for (IndexMetadata.Data indexMetadata : indexMetadatas) {
String reindexName = indexMetadata.getName() + indexSuffix;
logger.info("Recreating index " + indexMetadata.getName() + " as " + reindexName + " on target...");
Expand Down Expand Up @@ -379,7 +379,7 @@ public static void main(String[] args) throws InterruptedException {
String targetIndex = indexMetadata.getName() + indexSuffix;

final int finalShardId = shardId; // Define in local context for the lambda
DocumentReindexer.reindex(targetIndex, documents, targetConnection, reindexCtx)
DocumentReindexer.reindex(targetIndex, documents, targetClient, reindexCtx)
.doOnError(error -> logger.error("Error during reindexing: " + error))
.doOnSuccess(done -> logger.info("Reindexing completed for index " + targetIndex + ", shard " + finalShardId))
// Wait for the shard reindexing to complete before proceeding; fine in this demo script, but
Expand Down
31 changes: 30 additions & 1 deletion RFS/src/main/java/com/rfs/RunRfsWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

import com.rfs.tracing.RootRfsContext;
import org.apache.logging.log4j.Level;
Expand All @@ -14,6 +18,7 @@


import com.rfs.cms.CmsClient;
import com.rfs.cms.CmsEntry;
import com.rfs.cms.OpenSearchCmsClient;
import com.rfs.common.ClusterVersion;
import com.rfs.common.ConnectionDetails;
Expand All @@ -33,7 +38,10 @@
import com.rfs.version_os_2_11.GlobalMetadataCreator_OS_2_11;
import com.rfs.worker.GlobalState;
import com.rfs.worker.MetadataRunner;
import com.rfs.worker.Runner;
import com.rfs.worker.SnapshotRunner;
import com.rfs.worker.WorkerStep;

import org.opensearch.migrations.tracing.ActiveContextTracker;
import org.opensearch.migrations.tracing.ActiveContextTrackerByActivityType;
import org.opensearch.migrations.tracing.CompositeContextTracker;
Expand Down Expand Up @@ -153,9 +161,30 @@ public static void main(String[] args) throws Exception {
MetadataRunner metadataWorker = new MetadataRunner(globalState, cmsClient, snapshotName, metadataFactory, metadataCreator, transformer);
metadataWorker.run();

} catch (Runner.PhaseFailed e) {
logPhaseFailureRecord(e.phase, e.nextStep, e.cmsEntry, e.e);
throw e;
} catch (Exception e) {
logger.error("Error running RfsWorker", e);
logger.error("Unexpected error running RfsWorker", e);
throw e;
}
}

public static void logPhaseFailureRecord(GlobalState.Phase phase, WorkerStep nextStep, Optional<CmsEntry.Base> cmsEntry, Exception e) {
ObjectNode errorBlob = new ObjectMapper().createObjectNode();
errorBlob.put("exceptionMessage", e.getMessage());
errorBlob.put("exceptionClass", e.getClass().getSimpleName());
errorBlob.put("exceptionTrace", Arrays.toString(e.getStackTrace()));

errorBlob.put("phase", phase.toString());

String currentStep = (nextStep != null) ? nextStep.getClass().getSimpleName() : "null";
errorBlob.put("currentStep", currentStep);

String currentEntry = (cmsEntry.isPresent()) ? cmsEntry.toString() : "null";
errorBlob.put("cmsEntry", currentEntry);


logger.error(errorBlob.toString());
}
}
41 changes: 20 additions & 21 deletions RFS/src/main/java/com/rfs/cms/CmsClient.java
Original file line number Diff line number Diff line change
@@ -1,46 +1,45 @@
package com.rfs.cms;

import java.util.Optional;

/*
* Client to connect to and work with the Coordinating Metadata Store. The CMS could be implemented by any reasonable
* data store option (Postgres, AWS DynamoDB, Elasticsearch/Opensearch, etc).
*/
public interface CmsClient {
/*
* Creates a new entry in the CMS for the Snapshot's progress. Returns true if we created the entry, and false if
* the entry already exists.
*/
public boolean createSnapshotEntry(String snapshotName);

/*
* Attempt to retrieve the Snapshot entry from the CMS, if it exists; null if it doesn't currently exist
* Creates a new entry in the CMS for the Snapshot's progress. Returns an Optional; if the document was created, it
* will be the created object and empty otherwise.
*/
public CmsEntry.Snapshot getSnapshotEntry(String snapshotName);
public Optional<CmsEntry.Snapshot> createSnapshotEntry(String snapshotName);

/*
* Updates the status of the Snapshot entry in the CMS. Returns true if the update was successful, and false if
* something else updated it before we could
* Attempt to retrieve the Snapshot entry from the CMS. Returns an Optional; if the document exists, it will be the
* retrieved entry and empty otherwise.
*/
public boolean updateSnapshotEntry(String snapshotName, CmsEntry.SnapshotStatus status);
public Optional<CmsEntry.Snapshot> getSnapshotEntry(String snapshotName);

/*
* Creates a new entry in the CMS for the Metadata Migration's progress. Returns true if we created the entry, and
* false if the entry already exists.
* Updates the Snapshot entry in the CMS. Returns an Optional; if the document was updated, it will be
* the updated entry and empty otherwise.
*/
public boolean createMetadataEntry();
public Optional<CmsEntry.Snapshot> updateSnapshotEntry(String snapshotName, CmsEntry.SnapshotStatus status);

/*
* Attempt to retrieve the Metadata Migration entry from the CMS, if it exists; null if it doesn't currently exist
* Creates a new entry in the CMS for the Metadata Migration's progress. Returns an Optional; if the document was
* created, it will be the created entry and empty otherwise.
*/
public CmsEntry.Metadata getMetadataEntry();
public Optional<CmsEntry.Metadata> createMetadataEntry();

/*
* Updates just the status field of the Metadata Migration entry in the CMS. Returns true if the update was successful,
* Attempt to retrieve the Metadata Migration entry from the CMS, if it exists. Returns an Optional; if the document
* exists, it will be the retrieved entry and empty otherwise.
*/
public boolean setMetadataMigrationStatus(CmsEntry.MetadataStatus status);
public Optional<CmsEntry.Metadata> getMetadataEntry();

/*
* Updates all fields of the Metadata Migration entry in the CMS. Returns true if the update was successful, and
* false if something else updated it before we could
* Updates the Metadata Migration entry in the CMS. Returns an Optional; if the document was updated,
* it will be the updated entry and empty otherwise.
*/
public boolean updateMetadataEntry(CmsEntry.MetadataStatus status, String leaseExpiry, Integer numAttempts);
public Optional<CmsEntry.Metadata> updateMetadataEntry(CmsEntry.MetadataStatus status, String leaseExpiry, Integer numAttempts);
}
28 changes: 26 additions & 2 deletions RFS/src/main/java/com/rfs/cms/CmsEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,35 @@
import com.rfs.common.RfsException;

public class CmsEntry {
public abstract static class Base {
protected Base() {}
public abstract String toString();
}

public static enum SnapshotStatus {
NOT_STARTED,
IN_PROGRESS,
COMPLETED,
FAILED,
}

public static class Snapshot {
public static class Snapshot extends Base {
public final String name;
public final SnapshotStatus status;

public Snapshot(String name, SnapshotStatus status) {
super();
this.name = name;
this.status = status;
}

@Override
public String toString() {
return "Snapshot("
+ "name='" + name + ","
+ "status=" + status +
")";
}
}

public static enum MetadataStatus {
Expand All @@ -26,7 +40,7 @@ public static enum MetadataStatus {
FAILED,
}

public static class Metadata {
public static class Metadata extends Base {
public static final int METADATA_LEASE_MS = 1 * 60 * 1000; // 1 minute, arbitrarily chosen
public static final int MAX_ATTEMPTS = 3; // arbitrarily chosen

Expand All @@ -50,10 +64,20 @@ public static String getLeaseExpiry(long currentTime, int numAttempts) {
public final Integer numAttempts;

public Metadata(MetadataStatus status, String leaseExpiry, int numAttempts) {
super();
this.status = status;
this.leaseExpiry = leaseExpiry;
this.numAttempts = numAttempts;
}

@Override
public String toString() {
return "Metadata("
+ "status=" + status.toString() + ","
+ "leaseExpiry=" + leaseExpiry + ","
+ "numAttempts=" + numAttempts.toString() +
")";
}
}

public static class CouldNotFindNextLeaseDuration extends RfsException {
Expand Down
Loading

0 comments on commit 9e11ff0

Please sign in to comment.