Skip to content

Commit

Permalink
Added backoff to RFS Metadata work leases (opensearch-project#650)
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Helma <chelma+github@amazon.com>
  • Loading branch information
chelma authored May 13, 2024
1 parent 45f72ea commit a2b9fdc
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 9 deletions.
22 changes: 22 additions & 0 deletions RFS/src/main/java/com/rfs/cms/CmsEntry.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.rfs.cms;

import com.rfs.common.RfsException;

public class CmsEntry {
public static enum SnapshotStatus {
Expand Down Expand Up @@ -29,6 +30,21 @@ public static class Metadata {
public static final int METADATA_LEASE_MS = 1 * 60 * 1000; // 1 minute, arbitrarily chosen
public static final int MAX_ATTEMPTS = 3; // arbitrarily chosen

public static int getLeaseDurationMs(int numAttempts) {
if (numAttempts > MAX_ATTEMPTS) {
throw new CouldNotFindNextLeaseDuration("numAttempts=" + numAttempts + " is greater than MAX_ATTEMPTS=" + MAX_ATTEMPTS);
} else if (numAttempts < 1) {
throw new CouldNotFindNextLeaseDuration("numAttempts=" + numAttempts + " is less than 1");
}
return METADATA_LEASE_MS * numAttempts; // Arbitratily chosen algorithm
}

// TODO: We should be ideally setting the lease expiry using the server's clock, but it's unclear on the best
// way to do this. For now, we'll just use the client's clock.
public static String getLeaseExpiry(long currentTime, int numAttempts) {
return Long.toString(currentTime + getLeaseDurationMs(numAttempts));
}

public final MetadataStatus status;
public final String leaseExpiry;
public final Integer numAttempts;
Expand All @@ -39,4 +55,10 @@ public Metadata(MetadataStatus status, String leaseExpiry, int numAttempts) {
this.numAttempts = numAttempts;
}
}

public static class CouldNotFindNextLeaseDuration extends RfsException {
public CouldNotFindNextLeaseDuration(String message) {
super("Could not find next lease duration. Reason: " + message);
}
}
}
4 changes: 2 additions & 2 deletions RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.rfs.cms;

import java.util.Date;
import java.time.Instant;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand Down Expand Up @@ -58,7 +58,7 @@ public static ObjectNode getInitial() {

// TODO: We should be ideally setting the lease using the server's clock, but it's unclear on the best way
// to do this. For now, we'll just use the client's clock.
metadataDoc.put(FIELD_LEASE_EXPIRY, new Date().getTime() + CmsEntry.Metadata.METADATA_LEASE_MS);
metadataDoc.put(FIELD_LEASE_EXPIRY, CmsEntry.Metadata.getLeaseExpiry(Instant.now().toEpochMilli(), 1));

return metadataDoc;
}
Expand Down
6 changes: 3 additions & 3 deletions RFS/src/main/java/com/rfs/worker/MetadataStep.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ public AcquireLease(SharedMembers members, CmsEntry.Metadata existingEntry) {
this.existingEntry = existingEntry;
}

protected Instant getNow() {
return Instant.now();
protected long getNowMs() {
return Instant.now().toEpochMilli();
}

@Override
Expand All @@ -166,7 +166,7 @@ public void run() {
// TODO: Should be using the server-side clock here
this.acquiredLease = members.cmsClient.updateMetadataEntry(
CmsEntry.MetadataStatus.IN_PROGRESS,
String.valueOf(getNow().plusMillis(CmsEntry.Metadata.METADATA_LEASE_MS).toEpochMilli()),
CmsEntry.Metadata.getLeaseExpiry(getNowMs(), existingEntry.numAttempts + 1),
existingEntry.numAttempts + 1
);

Expand Down
48 changes: 48 additions & 0 deletions RFS/src/test/java/com/rfs/cms/CmsEntryTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.rfs.cms;

import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class CmsEntryTest {

static Stream<Arguments> provide_Metadata_getLeaseExpiry_HappyPath_args() {
// Generate an argument for each possible number of attempts
Stream<Arguments> argStream = Stream.of();
for (int i = 1; i <= CmsEntry.Metadata.MAX_ATTEMPTS; i++) {
argStream = Stream.concat(argStream, Stream.of(Arguments.of(i)));
}
return argStream;
}

@ParameterizedTest
@MethodSource("provide_Metadata_getLeaseExpiry_HappyPath_args")
void Metadata_getLeaseExpiry_HappyPath(int numAttempts) {
// Run the test
String result = CmsEntry.Metadata.getLeaseExpiry(0, numAttempts);

// Check the results
assertEquals(Long.toString(CmsEntry.Metadata.METADATA_LEASE_MS * numAttempts), result);
}

static Stream<Arguments> provide_Metadata_getLeaseExpiry_UnhappyPath_args() {
return Stream.of(
Arguments.of(0),
Arguments.of(CmsEntry.Metadata.MAX_ATTEMPTS + 1)
);
}

@ParameterizedTest
@MethodSource("provide_Metadata_getLeaseExpiry_UnhappyPath_args")
void Metadata_getLeaseExpiry_UnhappyPath(int numAttempts) {
// Run the test
assertThrows(CmsEntry.CouldNotFindNextLeaseDuration.class, () -> {
CmsEntry.Metadata.getLeaseExpiry(0, numAttempts);
});
}
}
8 changes: 4 additions & 4 deletions RFS/src/test/java/com/rfs/worker/MetadataStepTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ public TestAcquireLease(SharedMembers members, CmsEntry.Metadata existingEntry)
}

@Override
protected Instant getNow() {
return Instant.ofEpochMilli(milliSinceEpoch);
protected long getNowMs() {
return milliSinceEpoch;
}
}

Expand All @@ -212,7 +212,7 @@ void AcquireLease_AsExpected(boolean acquiredLease, Class<?> nextStepClass) {
// Set up the test
CmsEntry.Metadata existingEntry = new CmsEntry.Metadata(
CmsEntry.MetadataStatus.IN_PROGRESS,
"0",
CmsEntry.Metadata.getLeaseExpiry(0L, CmsEntry.Metadata.MAX_ATTEMPTS - 1),
CmsEntry.Metadata.MAX_ATTEMPTS - 1
);

Expand All @@ -228,7 +228,7 @@ void AcquireLease_AsExpected(boolean acquiredLease, Class<?> nextStepClass) {
// Check the results
Mockito.verify(testMembers.cmsClient, times(1)).updateMetadataEntry(
CmsEntry.MetadataStatus.IN_PROGRESS,
String.valueOf(TestAcquireLease.milliSinceEpoch + CmsEntry.Metadata.METADATA_LEASE_MS),
CmsEntry.Metadata.getLeaseExpiry(TestAcquireLease.milliSinceEpoch, CmsEntry.Metadata.MAX_ATTEMPTS),
CmsEntry.Metadata.MAX_ATTEMPTS
);
assertEquals(nextStepClass, nextStep.getClass());
Expand Down

0 comments on commit a2b9fdc

Please sign in to comment.