From 15fd4b10603132fe61d6409618b946224f10f258 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Fri, 13 Oct 2023 15:28:00 +0530 Subject: [PATCH 1/4] fix lock release before deletion is completed Signed-off-by: bansvaru --- .../opensearch/gateway/remote/RemoteClusterStateService.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index b9d06c8fbb1c1..d1f9bdfad9b69 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -1109,8 +1109,9 @@ public void onFailure(Exception e) { } } ); - } finally { + } catch (Exception e) { deleteStaleMetadataRunning.set(false); + throw e; } } @@ -1190,7 +1191,7 @@ private void deleteStalePaths(String clusterName, String clusterUUID, List { String clusterName = clusterState.getClusterName().value(); - logger.info("Deleting stale cluster UUIDs data from remote [{}]", clusterName); + logger.debug("Deleting stale cluster UUIDs data from remote [{}]", clusterName); Set allClustersUUIDsInRemote; try { allClustersUUIDsInRemote = new HashSet<>(getAllClusterUUIDs(clusterState.getClusterName().value())); From c427c2138bbe3cab8ce310abd5e4954d6d9ebacf Mon Sep 17 00:00:00 2001 From: bansvaru Date: Fri, 20 Oct 2023 14:35:11 +0530 Subject: [PATCH 2/4] add UT Signed-off-by: bansvaru --- .../remote/RemoteClusterStateService.java | 3 +- .../RemoteClusterStateServiceTests.java | 34 +++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index d1f9bdfad9b69..5e9591b4783b3 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -1072,7 +1072,8 @@ public void onFailure(Exception e) { * @param clusterUUID uuid of cluster state to refer to in remote * @param manifestsToRetain no of latest manifest files to keep in remote */ - private void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) { + // public for testing + public void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) { if (deleteStaleMetadataRunning.compareAndSet(false, true) == false) { logger.info("Delete stale cluster metadata task is already in progress."); return; diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 49b7f0ff8d1a9..c4ce7f8a261ea 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -62,6 +62,9 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import org.mockito.ArgumentCaptor; @@ -73,6 +76,7 @@ import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX; import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX; +import static org.opensearch.gateway.remote.RemoteClusterStateService.RETAINED_MANIFESTS; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; @@ -285,6 +289,36 @@ public void testWriteFullMetadataFailureForGlobalMetadata() throws IOException { ); } + public void testSingleExecutionOfDeleteTask() throws Exception { + BlobContainer blobContainer = mock(BlobContainer.class); + BlobPath blobPath = new BlobPath().add("random-path"); + when((blobStoreRepository.basePath())).thenReturn(blobPath); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger callCount = new AtomicInteger(0); + doAnswer(invocation -> { + callCount.incrementAndGet(); + if (latch.await(5000, TimeUnit.SECONDS) == false) { + throw new Exception("latch timeout"); + } + return null; + }).when(blobContainer) + .listBlobsByPrefixInSortedOrder( + any(String.class), + any(int.class), + any(BlobContainer.BlobNameSortOrder.class), + any(ActionListener.class) + ); + + remoteClusterStateService.start(); + remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS); + remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS); + + latch.countDown(); + assertBusy(() -> assertEquals(1, callCount.get())); + } + public void testWriteFullMetadataInParallelFailureForIndexMetadata() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class); From be658ad867d22d5efc4f054087d37cedd24abf1d Mon Sep 17 00:00:00 2001 From: bansvaru Date: Fri, 20 Oct 2023 14:41:04 +0530 Subject: [PATCH 3/4] refactor code a bit Signed-off-by: bansvaru --- .../RemoteClusterStateServiceTests.java | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index c4ce7f8a261ea..433eac63e9580 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -289,36 +289,6 @@ public void testWriteFullMetadataFailureForGlobalMetadata() throws IOException { ); } - public void testSingleExecutionOfDeleteTask() throws Exception { - BlobContainer blobContainer = mock(BlobContainer.class); - BlobPath blobPath = new BlobPath().add("random-path"); - when((blobStoreRepository.basePath())).thenReturn(blobPath); - when(blobStore.blobContainer(any())).thenReturn(blobContainer); - - CountDownLatch latch = new CountDownLatch(1); - AtomicInteger callCount = new AtomicInteger(0); - doAnswer(invocation -> { - callCount.incrementAndGet(); - if (latch.await(5000, TimeUnit.SECONDS) == false) { - throw new Exception("latch timeout"); - } - return null; - }).when(blobContainer) - .listBlobsByPrefixInSortedOrder( - any(String.class), - any(int.class), - any(BlobContainer.BlobNameSortOrder.class), - any(ActionListener.class) - ); - - remoteClusterStateService.start(); - remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS); - remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS); - - latch.countDown(); - assertBusy(() -> assertEquals(1, callCount.get())); - } - public void testWriteFullMetadataInParallelFailureForIndexMetadata() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class); @@ -1038,6 +1008,36 @@ public void testFileNames() { assertThat(splittedName[3], is("P")); } + public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Exception { + BlobContainer blobContainer = mock(BlobContainer.class); + BlobPath blobPath = new BlobPath().add("random-path"); + when((blobStoreRepository.basePath())).thenReturn(blobPath); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger callCount = new AtomicInteger(0); + doAnswer(invocation -> { + callCount.incrementAndGet(); + if (latch.await(5000, TimeUnit.SECONDS) == false) { + throw new Exception("Timed out waiting for delete task queuing to complete"); + } + return null; + }).when(blobContainer) + .listBlobsByPrefixInSortedOrder( + any(String.class), + any(int.class), + any(BlobContainer.BlobNameSortOrder.class), + any(ActionListener.class) + ); + + remoteClusterStateService.start(); + remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS); + remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS); + + latch.countDown(); + assertBusy(() -> assertEquals(1, callCount.get())); + } + private void mockObjectsForGettingPreviousClusterUUID(Map clusterUUIDsPointers) throws IOException { final BlobPath blobPath = mock(BlobPath.class); when((blobStoreRepository.basePath())).thenReturn(blobPath); From 52e6621d5ccdc00922e243f3cddebf34304c0889 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Fri, 20 Oct 2023 16:47:09 +0530 Subject: [PATCH 4/4] address PR comment Signed-off-by: bansvaru --- .../opensearch/gateway/remote/RemoteClusterStateService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 5e9591b4783b3..96ce2fc779ea0 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -1072,8 +1072,8 @@ public void onFailure(Exception e) { * @param clusterUUID uuid of cluster state to refer to in remote * @param manifestsToRetain no of latest manifest files to keep in remote */ - // public for testing - public void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) { + // package private for testing + void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) { if (deleteStaleMetadataRunning.compareAndSet(false, true) == false) { logger.info("Delete stale cluster metadata task is already in progress."); return;