diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexCoordinator.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexCoordinator.kt index a1878fd96..f1b6202da 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexCoordinator.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexCoordinator.kt @@ -17,6 +17,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementPlugin.Companion.INDEX_STATE_MANAGEMENT_INDEX import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.getClusterStateManagedIndexConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.getManagedIndexMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.getPolicyID import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.retry import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.shouldCreateManagedIndexConfig @@ -41,6 +42,9 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.deleteMan import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getCreateManagedIndexRequests import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getDeleteManagedIndexRequests import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getSweptManagedIndexSearchRequest +import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.isFailed +import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.isPolicyCompleted +import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.updateEnableManagedIndexRequest import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers @@ -49,10 +53,13 @@ import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.elasticsearch.ExceptionsHelper import org.elasticsearch.action.DocWriteRequest +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse import org.elasticsearch.action.bulk.BackoffPolicy import org.elasticsearch.action.bulk.BulkRequest import org.elasticsearch.action.bulk.BulkResponse import org.elasticsearch.action.search.SearchResponse +import org.elasticsearch.action.support.IndicesOptions import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.client.Client import org.elasticsearch.cluster.ClusterChangedEvent @@ -98,7 +105,8 @@ class ManagedIndexCoordinator( private val threadPool: ThreadPool, indexStateManagementIndices: IndexStateManagementIndices ) : LocalNodeMasterListener, ClusterStateListener, - CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ManagedIndexCoordinator")), LifecycleListener() { + CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ManagedIndexCoordinator")), + LifecycleListener() { private val logger = LogManager.getLogger(javaClass) private val ismIndices = indexStateManagementIndices @@ -127,10 +135,9 @@ class ManagedIndexCoordinator( indexStateManagementEnabled = it if (!indexStateManagementEnabled) disable() else enable() } - clusterService.clusterSettings - .addSettingsUpdateConsumer(COORDINATOR_BACKOFF_MILLIS, COORDINATOR_BACKOFF_COUNT) { - millis, count -> retryPolicy = BackoffPolicy.constantBackoff(millis, count) - } + clusterService.clusterSettings.addSettingsUpdateConsumer(COORDINATOR_BACKOFF_MILLIS, COORDINATOR_BACKOFF_COUNT) { + millis, count -> retryPolicy = BackoffPolicy.constantBackoff(millis, count) + } } override fun onMaster() { @@ -172,6 +179,17 @@ class ManagedIndexCoordinator( private fun enable() { initBackgroundSweep() indexStateManagementEnabled = true + + // Calling initBackgroundSweep() beforehand runs a sweep ensuring that policies removed from indices + // and indices being deleted are accounted for prior to re-enabling jobs + launch { + try { + logger.debug("Re-enabling jobs for managed indices") + reenableJobs() + } catch (e: Exception) { + logger.error("Failed to re-enable jobs for managed indices", e) + } + } } private fun disable() { @@ -179,6 +197,35 @@ class ManagedIndexCoordinator( indexStateManagementEnabled = false } + private suspend fun reenableJobs() { + val clusterStateRequest = ClusterStateRequest() + .clear() + .metaData(true) + .local(false) + .indices("*") + .indicesOptions(IndicesOptions.strictExpand()) + + val response: ClusterStateResponse = client.admin().cluster().suspendUntil { state(clusterStateRequest, it) } + + /* + * Iterate through all indices and create update requests to update the ManagedIndexConfig for indices that + * meet the following conditions: + * 1. Is being managed (has ManagedIndexMetaData) + * 2. Does not have a completed Policy + * 3. Does not have a failed Policy + */ + val updateManagedIndicesRequests: List> = response.state.metaData.indices.mapNotNull { + val managedIndexMetaData = it.value.getManagedIndexMetaData() + if (!(managedIndexMetaData == null || managedIndexMetaData.isPolicyCompleted || managedIndexMetaData.isFailed)) { + updateEnableManagedIndexRequest(it.value.indexUUID) + } else { + null + } + } + + updateManagedIndices(updateManagedIndicesRequests, false) + } + private fun isIndexStateManagementEnabled(): Boolean = indexStateManagementEnabled == true @OpenForTesting diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt index 5fe19cb5e..e70b2a442 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt @@ -33,7 +33,9 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedi import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.PolicyRetryInfoMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StateMetaData import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings +import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_ISM_ENABLED import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_JOB_INTERVAL +import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings.Companion.INDEX_STATE_MANAGEMENT_ENABLED import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings.Companion.JOB_INTERVAL import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.managedIndexConfigIndexRequest @@ -108,6 +110,7 @@ object ManagedIndexRunner : ScheduledJobRunner, private lateinit var xContentRegistry: NamedXContentRegistry private lateinit var scriptService: ScriptService private lateinit var settings: Settings + private var indexStateManagementEnabled: Boolean = DEFAULT_ISM_ENABLED @Suppress("MagicNumber") private val savePolicyRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3) @Suppress("MagicNumber") @@ -148,6 +151,11 @@ object ManagedIndexRunner : ScheduledJobRunner, clusterService.clusterSettings.addSettingsUpdateConsumer(JOB_INTERVAL) { jobInterval = it } + + indexStateManagementEnabled = INDEX_STATE_MANAGEMENT_ENABLED.get(settings) + clusterService.clusterSettings.addSettingsUpdateConsumer(INDEX_STATE_MANAGEMENT_ENABLED) { + indexStateManagementEnabled = it + } return this } @@ -218,6 +226,13 @@ object ManagedIndexRunner : ScheduledJobRunner, val action: Action? = state?.getActionToExecute(clusterService, scriptService, client, managedIndexMetaData) val step: Step? = action?.getStepToExecute() + // If Index State Management is disabled and the current step is not null and safe to disable on + // then disable the job and return early + if (!indexStateManagementEnabled && step != null && step.isSafeToDisableOn) { + disableManagedIndexConfig(managedIndexConfig) + return + } + if (action?.hasTimedOut(managedIndexMetaData.actionMetaData) == true) { val info = mapOf("message" to "Action timed out") logger.error("Action=${action.type.type} has timed out") diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/settings/ManagedIndexSettings.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/settings/ManagedIndexSettings.kt index 3fc9434cb..dac186af2 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/settings/ManagedIndexSettings.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/settings/ManagedIndexSettings.kt @@ -21,11 +21,12 @@ import java.util.concurrent.TimeUnit class ManagedIndexSettings { companion object { + const val DEFAULT_ISM_ENABLED = true const val DEFAULT_JOB_INTERVAL = 5 val INDEX_STATE_MANAGEMENT_ENABLED = Setting.boolSetting( "opendistro.index_state_management.enabled", - true, + DEFAULT_ISM_ENABLED, Setting.Property.NodeScope, Setting.Property.Dynamic ) diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/Step.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/Step.kt index afc2e5151..82d81697e 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/Step.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/Step.kt @@ -23,7 +23,7 @@ import org.elasticsearch.common.io.stream.Writeable import java.time.Instant import java.util.Locale -abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMetaData) { +abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMetaData, val isSafeToDisableOn: Boolean = true) { abstract suspend fun execute() diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt index 3f7734488..0a55983c0 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt @@ -35,7 +35,7 @@ class WaitForForceMergeStep( val client: Client, val config: ForceMergeActionConfig, managedIndexMetaData: ManagedIndexMetaData -) : Step(name, managedIndexMetaData) { +) : Step(name, managedIndexMetaData, false) { private val logger = LogManager.getLogger(javaClass) private var stepStatus = StepStatus.STARTING diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtils.kt index 06fbc41b8..3107044f3 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtils.kt @@ -398,6 +398,10 @@ val ManagedIndexMetaData.isFailed: Boolean return false } +// Adding predicate extension to allow cleaner checks since policyCompleted is nullable +val ManagedIndexMetaData.isPolicyCompleted: Boolean + get() = this.policyCompleted == true + /** * We will change the policy if a change policy exists and if we are currently in * a Transitions action (which means we're safely at the end of a state). If a diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt index c182c1d14..2635d37e9 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt @@ -327,6 +327,29 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() { } } + @Suppress("UNCHECKED_CAST") + protected fun getSegmentCount(index: String): Int { + val statsResponse: Map = getStats(index) + + // Assert that shard count of stats response is 1 since the stats request being used is at the index level + // (meaning the segment count in the response is aggregated) but segment count for force merge + // (which this method is primarily being used for) is going to be validated per shard + val shardsInfo = statsResponse["_shards"] as Map + assertEquals("Shard count higher than expected", 1, shardsInfo["successful"]) + + val indicesStats = statsResponse["indices"] as Map>>> + return indicesStats[index]!!["primaries"]!!["segments"]!!["count"] as Int + } + + /** Get stats for [index] */ + private fun getStats(index: String): Map { + val response = client().makeRequest("GET", "/$index/_stats") + + assertEquals("Stats request failed", RestStatus.OK, response.restStatus()) + + return response.asMap() + } + @Suppress("UNCHECKED_CAST") protected fun getIndexState(indexName: String): String { val request = Request("GET", "/_cluster/state") diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ForceMergeActionIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ForceMergeActionIT.kt index c928ff6fb..e95f602c6 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ForceMergeActionIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ForceMergeActionIT.kt @@ -16,7 +16,6 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.action import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementRestTestCase -import com.amazon.opendistroforelasticsearch.indexstatemanagement.makeRequest import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Policy import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.State import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ForceMergeActionConfig @@ -24,7 +23,6 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomErrorNot import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor import org.elasticsearch.cluster.metadata.IndexMetaData import org.elasticsearch.common.settings.Settings -import org.elasticsearch.rest.RestStatus import java.time.Instant import java.time.temporal.ChronoUnit import java.util.Locale @@ -148,27 +146,4 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { waitFor { assertEquals("Segment count for [$indexName] after force merge is incorrect", 1, getSegmentCount(indexName)) } waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) } } - - @Suppress("UNCHECKED_CAST") - private fun getSegmentCount(index: String): Int { - val statsResponse: Map = getStats(index) - - // Assert that shard count of stats response is 1 since the stats request being used is at the index level - // (meaning the segment count in the response is aggregated) but segment count for force merge is going to be - // validated per shard - val shardsInfo = statsResponse["_shards"] as Map - assertEquals("Shard count higher than expected", 1, shardsInfo["successful"]) - - val indicesStats = statsResponse["indices"] as Map>>> - return indicesStats[index]!!["primaries"]!!["segments"]!!["count"] as Int - } - - /** Get stats for [index] */ - private fun getStats(index: String): Map { - val response = client().makeRequest("GET", "/$index/_stats") - - assertEquals("Stats request failed", RestStatus.OK, response.restStatus()) - - return response.asMap() - } } diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt index 19599570c..bd25c24dc 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt @@ -18,11 +18,21 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.coordinator import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementPlugin.Companion.INDEX_STATE_MANAGEMENT_INDEX import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementRestTestCase import com.amazon.opendistroforelasticsearch.indexstatemanagement.makeRequest +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Policy +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.State +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Transition +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.DeleteActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ForceMergeActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.RolloverActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomErrorNotification import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.xcontent.XContentType +import java.time.Instant +import java.time.temporal.ChronoUnit import java.util.Locale class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() { @@ -117,4 +127,190 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() { ) } } + + fun `test disabling and reenabling ism`() { + val indexName = "test_disable_ism_index-000001" + val policyID = "test_policy_1" + + // Create a policy with one State that performs rollover + val rolloverActionConfig = RolloverActionConfig(index = 0, minDocs = 5, minAge = null, minSize = null) + val states = listOf(State(name = "RolloverState", actions = listOf(rolloverActionConfig), transitions = listOf())) + val policy = Policy( + id = policyID, + description = "$policyID description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + + createPolicy(policy, policyID) + createIndex(indexName, policyID, "some_alias") + + // Add 5 documents so rollover condition will succeed + insertSampleData(indexName, docCount = 5) + + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + // Change the start time so the job will trigger in 2 seconds and init policy + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { assertEquals(policy.id, getExplainManagedIndexMetaData(indexName).policyID) } + + // Expect the Explain API to show an initialized ManagedIndexMetaData with the default state from the policy + waitFor { assertEquals(policy.defaultState, getExplainManagedIndexMetaData(indexName).stateMetaData?.name) } + + // Disable Index State Management + updateClusterSetting(ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED.key, "false") + + // Speed up to next execution where job should get disabled + updateManagedIndexConfigStartTime(managedIndexConfig) + + // Confirm job was disabled + val disabledManagedIndexConfig: ManagedIndexConfig = waitFor { + val config = getExistingManagedIndexConfig(indexName) + assertEquals("ManagedIndexConfig was not disabled", false, config.enabled) + config + } + + // Speed up to next execution and confirm that Explain API still shows information of policy initialization + updateManagedIndexConfigStartTime(disabledManagedIndexConfig) + + waitFor { + val expectedInfoString = mapOf("message" to "Successfully initialized policy: $policyID").toString() + assertPredicatesOnMetaData( + listOf( + indexName to listOf( + ManagedIndexMetaData.INDEX to indexName::equals, + ManagedIndexMetaData.POLICY_ID to policyID::equals, + ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedInfoString == info.toString() + ) + ), + getExplainMap(indexName), + false + ) + } + + // Re-enable Index State Management + updateClusterSetting(ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED.key, "true") + + // Confirm job was re-enabled + val enableddManagedIndexConfig: ManagedIndexConfig = waitFor { + val config = getExistingManagedIndexConfig(indexName) + assertEquals("ManagedIndexConfig was not re-enabled", true, config.enabled) + config + } + + // Speed up to next execution where the job should be rescheduled and the index rolled over + updateManagedIndexConfigStartTime(enableddManagedIndexConfig) + + waitFor { assertEquals("Rolled over index", getExplainManagedIndexMetaData(indexName).info?.get("message")) } + } + + fun `test not disabling ism on unsafe step`() { + val indexName = "test_safe_disable_ism" + val policyID = "test_policy_1" + + // Create a policy with one State that performs force_merge and another State that deletes the index + val forceMergeActionConfig = ForceMergeActionConfig(index = 0, maxNumSegments = 1) + val deleteActionConfig = DeleteActionConfig(index = 0) + val states = listOf( + State( + name = "ForceMergeState", + actions = listOf(forceMergeActionConfig), + transitions = listOf(Transition(stateName = "DeleteState", conditions = null)) + ), + State(name = "DeleteState", actions = listOf(deleteActionConfig), transitions = listOf()) + ) + + val policy = Policy( + id = policyID, + description = "$policyID description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + + createPolicy(policy, policyID) + createIndex(indexName, policyID) + + // Add sample data to increase segment count, passing in a delay to ensure multiple segments get created + insertSampleData(indexName, 3, 1000) + + waitFor { assertTrue("Segment count for [$indexName] was less than expected", getSegmentCount(indexName) > 1) } + + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + // Will change the startTime each execution so that it triggers in 2 seconds + // First execution: Policy is initialized + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } + + // Expect the Explain API to show an initialized ManagedIndexMetaData with the default state from the policy + waitFor { assertEquals(policy.defaultState, getExplainManagedIndexMetaData(indexName).stateMetaData?.name) } + + // Second execution: Index is set to read-only for force_merge + updateManagedIndexConfigStartTime(managedIndexConfig) + + waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) } + + // Third execution: Force merge operation is kicked off + updateManagedIndexConfigStartTime(managedIndexConfig) + Thread.sleep(3000) + + // Verify maxNumSegments is set in action properties when kicking off force merge + waitFor { + assertEquals( + "maxNumSegments not set in ActionProperties", + forceMergeActionConfig.maxNumSegments, + getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties?.maxNumSegments + ) + } + + // Disable Index State Management + updateClusterSetting(ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED.key, "false") + + // Fourth execution: WaitForForceMergeStep is not safe to disable on, so the job should not disable yet + updateManagedIndexConfigStartTime(managedIndexConfig) + Thread.sleep(3000) + + // Confirm job was not disabled + assertEquals("ManagedIndexConfig was disabled early", true, getExistingManagedIndexConfig(indexName).enabled) + + // Validate segments were merged + waitFor { assertEquals("Segment count for [$indexName] after force merge is incorrect", 1, getSegmentCount(indexName)) } + + // Fifth execution: Attempt transition, which is safe to disable on, so job should be disabled + updateManagedIndexConfigStartTime(managedIndexConfig) + + // Explain API info should still be that of the last executed Step + waitFor { assertEquals("Force merge completed", getExplainManagedIndexMetaData(indexName).info?.get("message")) } + + // Confirm job was disabled + val disabledManagedIndexConfig: ManagedIndexConfig = waitFor { + val config = getExistingManagedIndexConfig(indexName) + assertEquals("ManagedIndexConfig was not disabled", false, config.enabled) + config + } + + // Speed up to next execution to confirm Explain API still shows information of the last executed step (WaitForForceMergeStep) + updateManagedIndexConfigStartTime(disabledManagedIndexConfig) + + waitFor { + val expectedInfoString = mapOf("message" to "Force merge completed").toString() + assertPredicatesOnMetaData( + listOf( + indexName to listOf( + ManagedIndexMetaData.INDEX to indexName::equals, + ManagedIndexMetaData.POLICY_ID to policyID::equals, + ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedInfoString == info.toString() + ) + ), + getExplainMap(indexName), + false + ) + } + } }