Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Adds logic for enabling/disabling Index State Management (#93)
Browse files Browse the repository at this point in the history
* Added logic for disabling/enabling Index State Management

* Added tests for enabling/disabling Index State Management

* Simplified isPolicyCompleted check
  • Loading branch information
qreshi authored Sep 3, 2019
1 parent 1ef017c commit e44f4ce
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement

import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementPlugin.Companion.INDEX_STATE_MANAGEMENT_INDEX
import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.getClusterStateManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.getManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.getPolicyID
import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.retry
import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.shouldCreateManagedIndexConfig
Expand All @@ -41,6 +42,9 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.deleteMan
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getCreateManagedIndexRequests
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getDeleteManagedIndexRequests
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getSweptManagedIndexSearchRequest
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.isFailed
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.isPolicyCompleted
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.updateEnableManagedIndexRequest
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
Expand All @@ -49,10 +53,13 @@ import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.action.DocWriteRequest
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse
import org.elasticsearch.action.bulk.BackoffPolicy
import org.elasticsearch.action.bulk.BulkRequest
import org.elasticsearch.action.bulk.BulkResponse
import org.elasticsearch.action.search.SearchResponse
import org.elasticsearch.action.support.IndicesOptions
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.ClusterChangedEvent
Expand Down Expand Up @@ -98,7 +105,8 @@ class ManagedIndexCoordinator(
private val threadPool: ThreadPool,
indexStateManagementIndices: IndexStateManagementIndices
) : LocalNodeMasterListener, ClusterStateListener,
CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ManagedIndexCoordinator")), LifecycleListener() {
CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ManagedIndexCoordinator")),
LifecycleListener() {

private val logger = LogManager.getLogger(javaClass)
private val ismIndices = indexStateManagementIndices
Expand Down Expand Up @@ -127,10 +135,9 @@ class ManagedIndexCoordinator(
indexStateManagementEnabled = it
if (!indexStateManagementEnabled) disable() else enable()
}
clusterService.clusterSettings
.addSettingsUpdateConsumer(COORDINATOR_BACKOFF_MILLIS, COORDINATOR_BACKOFF_COUNT) {
millis, count -> retryPolicy = BackoffPolicy.constantBackoff(millis, count)
}
clusterService.clusterSettings.addSettingsUpdateConsumer(COORDINATOR_BACKOFF_MILLIS, COORDINATOR_BACKOFF_COUNT) {
millis, count -> retryPolicy = BackoffPolicy.constantBackoff(millis, count)
}
}

override fun onMaster() {
Expand Down Expand Up @@ -172,13 +179,53 @@ class ManagedIndexCoordinator(
private fun enable() {
initBackgroundSweep()
indexStateManagementEnabled = true

// Calling initBackgroundSweep() beforehand runs a sweep ensuring that policies removed from indices
// and indices being deleted are accounted for prior to re-enabling jobs
launch {
try {
logger.debug("Re-enabling jobs for managed indices")
reenableJobs()
} catch (e: Exception) {
logger.error("Failed to re-enable jobs for managed indices", e)
}
}
}

private fun disable() {
scheduledFullSweep?.cancel()
indexStateManagementEnabled = false
}

private suspend fun reenableJobs() {
val clusterStateRequest = ClusterStateRequest()
.clear()
.metaData(true)
.local(false)
.indices("*")
.indicesOptions(IndicesOptions.strictExpand())

val response: ClusterStateResponse = client.admin().cluster().suspendUntil { state(clusterStateRequest, it) }

/*
* Iterate through all indices and create update requests to update the ManagedIndexConfig for indices that
* meet the following conditions:
* 1. Is being managed (has ManagedIndexMetaData)
* 2. Does not have a completed Policy
* 3. Does not have a failed Policy
*/
val updateManagedIndicesRequests: List<DocWriteRequest<*>> = response.state.metaData.indices.mapNotNull {
val managedIndexMetaData = it.value.getManagedIndexMetaData()
if (!(managedIndexMetaData == null || managedIndexMetaData.isPolicyCompleted || managedIndexMetaData.isFailed)) {
updateEnableManagedIndexRequest(it.value.indexUUID)
} else {
null
}
}

updateManagedIndices(updateManagedIndicesRequests, false)
}

private fun isIndexStateManagementEnabled(): Boolean = indexStateManagementEnabled == true

@OpenForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedi
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.PolicyRetryInfoMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StateMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings
import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_ISM_ENABLED
import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_JOB_INTERVAL
import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings.Companion.INDEX_STATE_MANAGEMENT_ENABLED
import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings.Companion.JOB_INTERVAL
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.managedIndexConfigIndexRequest
Expand Down Expand Up @@ -108,6 +110,7 @@ object ManagedIndexRunner : ScheduledJobRunner,
private lateinit var xContentRegistry: NamedXContentRegistry
private lateinit var scriptService: ScriptService
private lateinit var settings: Settings
private var indexStateManagementEnabled: Boolean = DEFAULT_ISM_ENABLED
@Suppress("MagicNumber")
private val savePolicyRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3)
@Suppress("MagicNumber")
Expand Down Expand Up @@ -148,6 +151,11 @@ object ManagedIndexRunner : ScheduledJobRunner,
clusterService.clusterSettings.addSettingsUpdateConsumer(JOB_INTERVAL) {
jobInterval = it
}

indexStateManagementEnabled = INDEX_STATE_MANAGEMENT_ENABLED.get(settings)
clusterService.clusterSettings.addSettingsUpdateConsumer(INDEX_STATE_MANAGEMENT_ENABLED) {
indexStateManagementEnabled = it
}
return this
}

Expand Down Expand Up @@ -218,6 +226,13 @@ object ManagedIndexRunner : ScheduledJobRunner,
val action: Action? = state?.getActionToExecute(clusterService, scriptService, client, managedIndexMetaData)
val step: Step? = action?.getStepToExecute()

// If Index State Management is disabled and the current step is not null and safe to disable on
// then disable the job and return early
if (!indexStateManagementEnabled && step != null && step.isSafeToDisableOn) {
disableManagedIndexConfig(managedIndexConfig)
return
}

if (action?.hasTimedOut(managedIndexMetaData.actionMetaData) == true) {
val info = mapOf("message" to "Action timed out")
logger.error("Action=${action.type.type} has timed out")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import java.util.concurrent.TimeUnit

class ManagedIndexSettings {
companion object {
const val DEFAULT_ISM_ENABLED = true
const val DEFAULT_JOB_INTERVAL = 5

val INDEX_STATE_MANAGEMENT_ENABLED = Setting.boolSetting(
"opendistro.index_state_management.enabled",
true,
DEFAULT_ISM_ENABLED,
Setting.Property.NodeScope,
Setting.Property.Dynamic
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.elasticsearch.common.io.stream.Writeable
import java.time.Instant
import java.util.Locale

abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMetaData) {
abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMetaData, val isSafeToDisableOn: Boolean = true) {

abstract suspend fun execute()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class WaitForForceMergeStep(
val client: Client,
val config: ForceMergeActionConfig,
managedIndexMetaData: ManagedIndexMetaData
) : Step(name, managedIndexMetaData) {
) : Step(name, managedIndexMetaData, false) {

private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ val ManagedIndexMetaData.isFailed: Boolean
return false
}

// Adding predicate extension to allow cleaner checks since policyCompleted is nullable
val ManagedIndexMetaData.isPolicyCompleted: Boolean
get() = this.policyCompleted == true

/**
* We will change the policy if a change policy exists and if we are currently in
* a Transitions action (which means we're safely at the end of a state). If a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,29 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() {
}
}

@Suppress("UNCHECKED_CAST")
protected fun getSegmentCount(index: String): Int {
val statsResponse: Map<String, Any> = getStats(index)

// Assert that shard count of stats response is 1 since the stats request being used is at the index level
// (meaning the segment count in the response is aggregated) but segment count for force merge
// (which this method is primarily being used for) is going to be validated per shard
val shardsInfo = statsResponse["_shards"] as Map<String, Int>
assertEquals("Shard count higher than expected", 1, shardsInfo["successful"])

val indicesStats = statsResponse["indices"] as Map<String, Map<String, Map<String, Map<String, Any?>>>>
return indicesStats[index]!!["primaries"]!!["segments"]!!["count"] as Int
}

/** Get stats for [index] */
private fun getStats(index: String): Map<String, Any> {
val response = client().makeRequest("GET", "/$index/_stats")

assertEquals("Stats request failed", RestStatus.OK, response.restStatus())

return response.asMap()
}

@Suppress("UNCHECKED_CAST")
protected fun getIndexState(indexName: String): String {
val request = Request("GET", "/_cluster/state")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@
package com.amazon.opendistroforelasticsearch.indexstatemanagement.action

import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementRestTestCase
import com.amazon.opendistroforelasticsearch.indexstatemanagement.makeRequest
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Policy
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.State
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ForceMergeActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomErrorNotification
import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor
import org.elasticsearch.cluster.metadata.IndexMetaData
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.rest.RestStatus
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Locale
Expand Down Expand Up @@ -148,27 +146,4 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() {
waitFor { assertEquals("Segment count for [$indexName] after force merge is incorrect", 1, getSegmentCount(indexName)) }
waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) }
}

@Suppress("UNCHECKED_CAST")
private fun getSegmentCount(index: String): Int {
val statsResponse: Map<String, Any> = getStats(index)

// Assert that shard count of stats response is 1 since the stats request being used is at the index level
// (meaning the segment count in the response is aggregated) but segment count for force merge is going to be
// validated per shard
val shardsInfo = statsResponse["_shards"] as Map<String, Int>
assertEquals("Shard count higher than expected", 1, shardsInfo["successful"])

val indicesStats = statsResponse["indices"] as Map<String, Map<String, Map<String, Map<String, Any?>>>>
return indicesStats[index]!!["primaries"]!!["segments"]!!["count"] as Int
}

/** Get stats for [index] */
private fun getStats(index: String): Map<String, Any> {
val response = client().makeRequest("GET", "/$index/_stats")

assertEquals("Stats request failed", RestStatus.OK, response.restStatus())

return response.asMap()
}
}
Loading

0 comments on commit e44f4ce

Please sign in to comment.