Skip to content

Commit

Permalink
Fixed bug in S3Repo handling; added unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Helma <chelma+github@amazon.com>
  • Loading branch information
chelma committed Apr 22, 2024
1 parent c5ed5b9 commit 7d03870
Show file tree
Hide file tree
Showing 5 changed files with 331 additions and 23 deletions.
12 changes: 11 additions & 1 deletion RFS/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ dependencies {
implementation 'org.apache.lucene:lucene-analyzers-common:8.11.3'
implementation 'org.apache.lucene:lucene-backward-codecs:8.11.3'
implementation 'software.amazon.awssdk:s3:2.25.16'

testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.2'
testImplementation 'org.junit.jupiter:junit-jupiter-params:5.10.2'
testImplementation 'org.mockito:mockito-core:5.11.0'
testImplementation 'org.mockito:mockito-junit-jupiter:5.11.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.10.2'
}

application {
Expand Down Expand Up @@ -110,4 +116,8 @@ task buildDockerImages {
}

tasks.getByName('composeUp')
.dependsOn(tasks.getByName('buildDockerImages'))
.dependsOn(tasks.getByName('buildDockerImages'))

test {
useJUnitPlatform()
}
4 changes: 4 additions & 0 deletions RFS/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ services:
condition: service_started
networks:
- migrations
environment:
- AWS_ACCESS_KEY_ID=${access_key}
- AWS_SECRET_ACCESS_KEY=${secret_key}
- AWS_SESSION_TOKEN=${session_token}
volumes:
- snapshotStorage:/snapshots

Expand Down
2 changes: 1 addition & 1 deletion RFS/src/main/java/com/rfs/ReindexFromSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public static void main(String[] args) throws InterruptedException {
if (snapshotDirPath != null) {
repo = new FilesystemRepo(snapshotDirPath);
} else if (s3RepoUri != null && s3Region != null && s3LocalDirPath != null) {
repo = new S3Repo(s3LocalDirPath, s3RepoUri, s3Region);
repo = S3Repo.create(s3LocalDirPath, s3RepoUri, s3Region);
} else if (snapshotLocalRepoDirPath != null) {
repo = new FilesystemRepo(snapshotLocalRepoDirPath);
} else {
Expand Down
66 changes: 45 additions & 21 deletions RFS/src/main/java/com/rfs/common/S3Repo.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Object;
Expand All @@ -34,9 +35,9 @@ private static int extractVersion(String key) {
}
}

private String findRepoFileUri() {
String bucketName = s3RepoUri.split("/")[2];
String prefix = s3RepoUri.split(bucketName + "/")[1];
protected String findRepoFileUri() {
String bucketName = getS3BucketName();
String prefix = getS3ObjectsPrefix();

ListObjectsV2Request listRequest = ListObjectsV2Request.builder()
.bucket(bucketName)
Expand All @@ -46,39 +47,47 @@ private String findRepoFileUri() {
ListObjectsV2Response listResponse = s3Client.listObjectsV2(listRequest);

Optional<S3Object> highestVersionedIndexFile = listResponse.contents().stream()
.filter(s3Object -> s3Object.key().matches(".*/index-\\d+$")) // Regex to match index files
.filter(s3Object -> s3Object.key().matches(".*index-\\d+$")) // Regex to match index files
.max(Comparator.comparingInt(s3Object -> extractVersion(s3Object.key())));

return highestVersionedIndexFile
.map(s3Object -> "s3://" + bucketName + "/" + s3Object.key())
.orElse("No index files found in the specified directory.");
.orElse(null);
}

protected void ensureS3LocalDirectoryExists(Path localPath) throws IOException {
Files.createDirectories(localPath);
}

private void downloadFile(String s3Uri, Path localPath) throws IOException {
logger.info("Downloading file from S3: " + s3Uri + " to " + localPath);
Files.createDirectories(localPath.getParent());
ensureS3LocalDirectoryExists(localPath.getParent());

String bucketName = s3Uri.split("/")[2];
String key = s3Uri.split(bucketName + "/")[1];

s3Client.getObject((req) -> req.bucket(bucketName).key(key), ResponseTransformer.toFile(localPath));
}
GetObjectRequest getObjectRequest = GetObjectRequest.builder()
.bucket(bucketName)
.key(key)
.build();

public S3Repo(Path s3LocalDir, String s3Uri, String s3Region) {
this.s3LocalDir = s3LocalDir;
s3Client.getObject(getObjectRequest, ResponseTransformer.toFile(localPath));
}

// Remove any trailing slash from the S3 URI
if (s3Uri.endsWith("/")) {
this.s3RepoUri = s3Uri.substring(0, s3Uri.length() - 1);
} else {
this.s3RepoUri = s3Uri;
}

this.s3Region = s3Region;
this.s3Client = S3Client.builder()
.region(Region.of(this.s3Region))
public static S3Repo create(Path s3LocalDir, String s3Uri, String s3Region) {
S3Client s3Client = S3Client.builder()
.region(Region.of(s3Region))
.credentialsProvider(DefaultCredentialsProvider.create())
.build();

return new S3Repo(s3LocalDir, s3Uri, s3Region, s3Client);
}

public S3Repo(Path s3LocalDir, String s3Uri, String s3Region, S3Client s3Client) {
this.s3LocalDir = s3LocalDir;
this.s3RepoUri = s3Uri;
this.s3Region = s3Region;
this.s3Client = s3Client;
}

public Path getRepoRootDir() {
Expand All @@ -88,7 +97,6 @@ public Path getRepoRootDir() {
public Path getSnapshotRepoDataFilePath() throws IOException {
String repoFileS3Uri = findRepoFileUri();

// s3://bucket-name/path/to/index-1 => to/index-1
String relativeFileS3Uri = repoFileS3Uri.substring(s3RepoUri.length() + 1);

Path localFilePath = s3LocalDir.resolve(relativeFileS3Uri);
Expand Down Expand Up @@ -137,4 +145,20 @@ public Path getBlobFilePath(String indexId, int shardId, String blobName) throws
downloadFile(s3RepoUri + "/" + suffix, filePath);
return filePath;
}

public String getS3BucketName() {
String[] parts = s3RepoUri.substring(5).split("/", 2);
return parts[0];
}

public String getS3ObjectsPrefix() {
String[] parts = s3RepoUri.substring(5).split("/", 2);
String prefix = parts.length > 1 ? parts[1] : "";

if (!prefix.isEmpty() && prefix.endsWith("/")) {
prefix = prefix.substring(0, prefix.length() - 1);
}

return prefix;
}
}
Loading

0 comments on commit 7d03870

Please sign in to comment.