Skip to content

Commit

Permalink
Updates per PR comments (mostly adding @OverRide)
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Helma <chelma+github@amazon.com>
  • Loading branch information
chelma committed Apr 26, 2024
1 parent fb5e8c7 commit 395736e
Show file tree
Hide file tree
Showing 26 changed files with 156 additions and 5 deletions.
9 changes: 9 additions & 0 deletions RFS/src/main/java/com/rfs/common/FilesystemRepo.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,46 +38,55 @@ public FilesystemRepo(Path repoRootDir) {
this.repoRootDir = repoRootDir;
}

@Override
public Path getRepoRootDir() {
return repoRootDir;
}

@Override
public Path getSnapshotRepoDataFilePath() throws IOException {
return findRepoFile();
}

@Override
public Path getGlobalMetadataFilePath(String snapshotId) throws IOException {
String filePath = getRepoRootDir().toString() + "/meta-" + snapshotId + ".dat";
return Path.of(filePath);
}

@Override
public Path getSnapshotMetadataFilePath(String snapshotId) throws IOException {
String filePath = getRepoRootDir().toString() + "/snap-" + snapshotId + ".dat";
return Path.of(filePath);
}

@Override
public Path getIndexMetadataFilePath(String indexId, String indexFileId) throws IOException {
String filePath = getRepoRootDir().toString() + "/indices/" + indexId + "/meta-" + indexFileId + ".dat";
return Path.of(filePath);
}

@Override
public Path getShardDirPath(String indexId, int shardId) throws IOException {
String shardDirPath = getRepoRootDir().toString() + "/indices/" + indexId + "/" + shardId;
return Path.of(shardDirPath);
}

@Override
public Path getShardMetadataFilePath(String snapshotId, String indexId, int shardId) throws IOException {
Path shardDirPath = getShardDirPath(indexId, shardId);
Path filePath = shardDirPath.resolve("snap-" + snapshotId + ".dat");
return filePath;
}

@Override
public Path getBlobFilePath(String indexId, int shardId, String blobName) throws IOException {
Path shardDirPath = getShardDirPath(indexId, shardId);
Path filePath = shardDirPath.resolve(blobName);
return filePath;
}

@Override
public void prepBlobFiles(ShardMetadata.Data shardMetadata) throws IOException {
// No work necessary for local filesystem
}
Expand Down
14 changes: 13 additions & 1 deletion RFS/src/main/java/com/rfs/common/S3Repo.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@

public class S3Repo implements SourceRepo {
private static final Logger logger = LogManager.getLogger(S3Repo.class);
private static final double S3_TARGET_THROUGHPUT_GIBPS = 10.0; // Arbitrarily chosen
private static final double S3_TARGET_THROUGHPUT_GIBPS = 8.0; // Arbitrarily chosen
private static final long S3_MAX_MEMORY_BYTES = 1024L * 1024 * 1024; // Arbitrarily chosen
private static final long S3_MINIMUM_PART_SIZE_BYTES = 8L * 1024 * 1024; // Default, but be explicit

private final Path s3LocalDir;
Expand Down Expand Up @@ -89,7 +90,9 @@ public static S3Repo create(Path s3LocalDir, S3Uri s3Uri, String s3Region) {
S3AsyncClient s3Client = S3AsyncClient.crtBuilder()
.credentialsProvider(DefaultCredentialsProvider.create())
.region(Region.of(s3Region))
.retryConfiguration(r -> r.numRetries(3))
.targetThroughputInGbps(S3_TARGET_THROUGHPUT_GIBPS)
.maxNativeMemoryLimitInBytes(S3_MAX_MEMORY_BYTES)
.minimumPartSizeInBytes(S3_MINIMUM_PART_SIZE_BYTES)
.build();

Expand All @@ -103,10 +106,12 @@ public S3Repo(Path s3LocalDir, S3Uri s3Uri, String s3Region, S3AsyncClient s3Cli
this.s3Client = s3Client;
}

@Override
public Path getRepoRootDir() {
return s3LocalDir;
}

@Override
public Path getSnapshotRepoDataFilePath() throws IOException {
S3Uri repoFileS3Uri = findRepoFileUri();

Expand All @@ -118,6 +123,7 @@ public Path getSnapshotRepoDataFilePath() throws IOException {
return localFilePath;
}

@Override
public Path getGlobalMetadataFilePath(String snapshotId) throws IOException {
String suffix = "meta-" + snapshotId + ".dat";
Path filePath = s3LocalDir.resolve(suffix);
Expand All @@ -126,6 +132,7 @@ public Path getGlobalMetadataFilePath(String snapshotId) throws IOException {
return filePath;
}

@Override
public Path getSnapshotMetadataFilePath(String snapshotId) throws IOException {
String suffix = "snap-" + snapshotId + ".dat";
Path filePath = s3LocalDir.resolve(suffix);
Expand All @@ -134,6 +141,7 @@ public Path getSnapshotMetadataFilePath(String snapshotId) throws IOException {
return filePath;
}

@Override
public Path getIndexMetadataFilePath(String indexId, String indexFileId) throws IOException {
String suffix = "indices/" + indexId + "/meta-" + indexFileId + ".dat";
Path filePath = s3LocalDir.resolve(suffix);
Expand All @@ -142,12 +150,14 @@ public Path getIndexMetadataFilePath(String indexId, String indexFileId) throws
return filePath;
}

@Override
public Path getShardDirPath(String indexId, int shardId) throws IOException {
String suffix = "indices/" + indexId + "/" + shardId;
Path shardDirPath = s3LocalDir.resolve(suffix);
return shardDirPath;
}

@Override
public Path getShardMetadataFilePath(String snapshotId, String indexId, int shardId) throws IOException {
String suffix = "indices/" + indexId + "/" + shardId + "/snap-" + snapshotId + ".dat";
Path filePath = s3LocalDir.resolve(suffix);
Expand All @@ -156,6 +166,7 @@ public Path getShardMetadataFilePath(String snapshotId, String indexId, int shar
return filePath;
}

@Override
public Path getBlobFilePath(String indexId, int shardId, String blobName) throws IOException {
String suffix = "indices/" + indexId + "/" + shardId + "/" + blobName;
Path filePath = s3LocalDir.resolve(suffix);
Expand All @@ -164,6 +175,7 @@ public Path getBlobFilePath(String indexId, int shardId, String blobName) throws
return filePath;
}

@Override
public void prepBlobFiles(ShardMetadata.Data shardMetadata) throws IOException {
S3TransferManager transferManager = S3TransferManager.builder().s3Client(s3Client).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public Transformer_ES_6_8_to_OS_2_11(int awarenessAttributeDimensionality) {
this.awarenessAttributeDimensionality = awarenessAttributeDimensionality;
}

@Override
public ObjectNode transformGlobalMetadata(ObjectNode root) {
ObjectNode newRoot = mapper.createObjectNode();

Expand Down Expand Up @@ -49,6 +50,7 @@ public ObjectNode transformGlobalMetadata(ObjectNode root) {
return newRoot;
}

@Override
public ObjectNode transformIndexMetadata(ObjectNode root){
ObjectNode newRoot = root.deepCopy();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public Transformer_ES_7_10_OS_2_11(int awarenessAttributeDimensionality) {
this.awarenessAttributeDimensionality = awarenessAttributeDimensionality;
}

@Override
public ObjectNode transformGlobalMetadata(ObjectNode root){
ObjectNode newRoot = mapper.createObjectNode();

Expand Down Expand Up @@ -64,6 +65,7 @@ public ObjectNode transformGlobalMetadata(ObjectNode root){
return newRoot;
}

@Override
public ObjectNode transformIndexMetadata(ObjectNode root){
ObjectNode newRoot = root.deepCopy();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public GlobalMetadataData_ES_6_8(ObjectNode root) {
this.root = root;
}

@Override
public ObjectNode toObjectNode() throws Exception {
return root;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@

public class GlobalMetadataFactory_ES_6_8 implements com.rfs.common.GlobalMetadata.Factory{

@Override
public GlobalMetadata.Data fromJsonNode(JsonNode root) throws Exception {
ObjectNode metadataRoot = (ObjectNode) root.get("meta-data");
return new GlobalMetadataData_ES_6_8(metadataRoot);
}

@Override
public SmileFactory getSmileFactory() {
return ElasticsearchConstants_ES_6_8.SMILE_FACTORY;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ public IndexMetadataData_ES_6_8(ObjectNode root, String indexId, String indexNam
this.indexName = indexName;
}

@Override
public ObjectNode getAliases() {
return (ObjectNode) root.get("aliases");
}

@Override
public String getId() {
return indexId;
}

@Override
public ObjectNode getMappings() {
if (mappings != null) {
return mappings;
Expand All @@ -39,14 +42,17 @@ public ObjectNode getMappings() {
return mappings;
}

@Override
public String getName() {
return indexName;
}

@Override
public int getNumberOfShards() {
return this.getSettings().get("index").get("number_of_shards").asInt();
}

@Override
public ObjectNode getSettings() {
if (settings != null) {
return settings;
Expand All @@ -61,6 +67,7 @@ public ObjectNode getSettings() {
return settings;
}

@Override
public ObjectNode toObjectNode() {
return root;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@

public class IndexMetadataFactory_ES_6_8 implements com.rfs.common.IndexMetadata.Factory {

@Override
public IndexMetadata.Data fromJsonNode(JsonNode root, String indexId, String indexName) throws Exception {
ObjectNode objectNodeRoot = (ObjectNode) root.get(indexName);
return new IndexMetadataData_ES_6_8(objectNodeRoot, indexId, indexName);
}

@Override
public SmileFactory getSmileFactory() {
return ElasticsearchConstants_ES_6_8.SMILE_FACTORY;
}

@Override
public String getIndexFileId(SnapshotRepo.Provider repoDataProvider, String snapshotName, String indexName) {
return repoDataProvider.getSnapshotId(snapshotName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,42 +60,52 @@ public ShardMetadataData_ES_6_8(
this.files = convertedFiles;
}

@Override
public String getSnapshotName() {
return snapshotName;
}

@Override
public String getIndexName() {
return indexName;
}

@Override
public String getIndexId() {
return indexId;
}

@Override
public int getShardId() {
return shardId;
}

@Override
public int getIndexVersion() {
return indexVersion;
}

@Override
public long getStartTime() {
return startTime;
}

@Override
public long getTime() {
return time;
}

@Override
public int getNumberOfFiles() {
return numberOfFiles;
}

@Override
public long getTotalSize() {
return totalSize;
}

@Override
public List<ShardMetadata.FileInfo> getFiles() {
List<ShardMetadata.FileInfo> convertedFiles = new ArrayList<>(files);
return convertedFiles;
Expand Down Expand Up @@ -194,39 +204,48 @@ public FileInfo(
this.numberOfParts = numberOfParts;
}

@Override
public String getName() {
return name;
}

@Override
public String getPhysicalName() {
return physicalName;
}

@Override
public long getLength() {
return length;
}

@Override
public String getChecksum() {
return checksum;
}

@Override
public long getPartSize() {
return partSize;
}

@Override
public String getWrittenBy() {
return writtenBy;
}

@Override
public BytesRef getMetaHash() {
return metaHash;
}

@Override
public long getNumberOfParts() {
return numberOfParts;
}

// The Snapshot file may be split into multiple blobs; use this to find the correct file name
@Override
public String partName(long part) {
if (numberOfParts > 1) {
return name + ".part" + part;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

public class ShardMetadataFactory_ES_6_8 implements ShardMetadata.Factory {

@Override
public ShardMetadata.Data fromJsonNode(JsonNode root, String indexId, String indexName, int shardId) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
SimpleModule module = new SimpleModule();
Expand All @@ -31,6 +32,7 @@ public ShardMetadata.Data fromJsonNode(JsonNode root, String indexId, String ind
);
}

@Override
public SmileFactory getSmileFactory() {
return ElasticsearchConstants_ES_6_8.SMILE_FACTORY;
}
Expand Down
Loading

0 comments on commit 395736e

Please sign in to comment.