diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index 796f09cb9528f..be849452c0f5e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication; +import org.apache.lucene.index.SegmentInfos; import org.opensearch.action.search.SearchResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; @@ -16,6 +17,8 @@ import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.Nullable; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.Settings; import org.opensearch.core.index.Index; @@ -27,12 +30,14 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; +import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -230,4 +235,14 @@ protected void assertReplicaCheckpointUpdated(IndexShard primaryShard) throws Ex } }, 30, TimeUnit.SECONDS); } + + /** + * Returns the latest SIS for a shard but does not incref the segments. + */ + protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOException { + final Tuple, ReplicationCheckpoint> tuple = shard.getLatestSegmentInfosAndCheckpoint(); + try (final GatedCloseable closeable = tuple.v1()) { + return closeable.get(); + } + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 4a848e92800cb..87dd48de38d3e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -53,12 +53,11 @@ import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; import org.opensearch.common.action.ActionFuture; -import org.opensearch.common.collect.Tuple; -import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.set.Sets; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.XContentBuilder; @@ -73,7 +72,6 @@ import org.opensearch.index.engine.NRTReplicationReaderManager; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.recovery.FileChunkRequest; -import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.node.NodeClosedException; import org.opensearch.search.SearchService; @@ -92,6 +90,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -1053,32 +1053,31 @@ private void assertAllocationIdsInReplicaShardStats(Set expected, Set assertEquals( getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(), getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion() ) ); - final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME); - final Tuple, ReplicationCheckpoint> tuple = replicaShard.getLatestSegmentInfosAndCheckpoint(); - final Collection snapshottedSegments; - try (final GatedCloseable closeable = tuple.v1()) { - snapshottedSegments = closeable.get().files(false); - } + // opens a scrolled query before a flush is called. // this is for testing scroll segment consistency between refresh and flush SearchResponse searchResponse = client(replica).prepareSearch() @@ -1092,17 +1091,20 @@ public void testScrollCreatedOnReplica() throws Exception { .setScroll(TimeValue.timeValueDays(1)) .get(); - // force call flush - flush(INDEX_NAME); + final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME); + SegmentInfos latestSegmentInfos = getLatestSegmentInfos(replicaShard); + final Set snapshottedSegments = new HashSet<>(latestSegmentInfos.files(false)); + logger.info("Segments {}", snapshottedSegments); - for (int i = 3; i < 5; i++) { - client().prepareDelete(INDEX_NAME, String.valueOf(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + // index more docs and force merge down to 1 segment + for (int i = 1; i < 5; i++) { + client().prepareIndex(INDEX_NAME) + .setId(String.valueOf(i)) + .setSource(jsonBuilder().startObject().field("field", i).endObject()) + .get(); refresh(INDEX_NAME); - if (randomBoolean()) { - client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get(); - flush(INDEX_NAME); - } } + // create new on-disk segments and copy them out. assertBusy(() -> { assertEquals( getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(), @@ -1110,13 +1112,19 @@ public void testScrollCreatedOnReplica() throws Exception { ); }); + // force merge and flush. client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get(); + // wait for replication to complete assertBusy(() -> { assertEquals( getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(), getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion() ); }); + logger.info("Local segments after force merge and commit {}", getLatestSegmentInfos(replicaShard).files(false)); + List filesBeforeClearScroll = List.of(replicaShard.store().directory().listAll()); + assertTrue("Files should be preserved", filesBeforeClearScroll.containsAll(snapshottedSegments)); + // Test stats logger.info("--> Collect all scroll query hits"); long scrollHits = 0; @@ -1125,20 +1133,23 @@ public void testScrollCreatedOnReplica() throws Exception { searchResponse = client(replica).prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueDays(1)).get(); assertAllSuccessful(searchResponse); } while (searchResponse.getHits().getHits().length > 0); - - List currentFiles = List.of(replicaShard.store().directory().listAll()); - assertTrue("Files should be preserved", currentFiles.containsAll(snapshottedSegments)); + assertEquals(1, scrollHits); client(replica).prepareClearScroll().addScrollId(searchResponse.getScrollId()).get(); - - assertBusy( - () -> assertFalse( - "Files should be cleaned up post scroll clear request", - List.of(replicaShard.store().directory().listAll()).containsAll(snapshottedSegments) - ) + final Set filesAfterClearScroll = Arrays.stream(replicaShard.store().directory().listAll()).collect(Collectors.toSet()); + // there should be no active readers, snapshots, or on-disk commits containing the snapshotted files, check that they have been + // deleted. + Set latestCommitSegments = new HashSet<>(replicaShard.store().readLastCommittedSegmentsInfo().files(false)); + assertEquals( + "Snapshotted files are no longer part of the latest commit", + Collections.emptySet(), + Sets.intersection(latestCommitSegments, snapshottedSegments) + ); + assertEquals( + "All snapshotted files should be deleted", + Collections.emptySet(), + Sets.intersection(filesAfterClearScroll, snapshottedSegments) ); - assertEquals(10, scrollHits); - } /** diff --git a/server/src/main/java/org/opensearch/index/engine/ReplicaFileTracker.java b/server/src/main/java/org/opensearch/index/engine/ReplicaFileTracker.java index 19454967f9ee3..a9cc24abe3c01 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReplicaFileTracker.java +++ b/server/src/main/java/org/opensearch/index/engine/ReplicaFileTracker.java @@ -17,7 +17,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.function.BiConsumer; +import java.util.function.Consumer; /** * This class is heavily influenced by Lucene's ReplicaFileDeleter class used to keep track of @@ -31,10 +31,10 @@ final class ReplicaFileTracker { public static final Logger logger = LogManager.getLogger(ReplicaFileTracker.class); private final Map refCounts = new HashMap<>(); - private final BiConsumer fileDeleter; + private final Consumer fileDeleter; private final Set EXCLUDE_FILES = Set.of("write.lock"); - public ReplicaFileTracker(BiConsumer fileDeleter) { + public ReplicaFileTracker(Consumer fileDeleter) { this.fileDeleter = fileDeleter; } @@ -82,7 +82,7 @@ private synchronized void delete(Collection toDelete) { private synchronized void delete(String fileName) { assert canDelete(fileName); - fileDeleter.accept("delete unreferenced", fileName); + fileDeleter.accept(fileName); } private synchronized boolean canDelete(String fileName) {