Skip to content

Commit

Permalink
Fix rolling restarts
Browse files Browse the repository at this point in the history
  • Loading branch information
HoustonPutman committed Aug 30, 2023
1 parent 8ca4996 commit b49282d
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 113 deletions.
29 changes: 20 additions & 9 deletions controllers/solr_cluster_ops_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,21 +321,32 @@ func handleManagedCloudRollingUpdate(ctx context.Context, r *SolrCloudReconciler
updateLogger.Info("Pod killed for update.", "pod", pod.Name, "reason", "The solr container in the pod has not yet started, thus it is safe to update.")
}

// Pick which pods should be deleted for an update.
// Don't exit on an error, which would only occur because of an HTTP Exception. Requeue later instead.
additionalPodsToUpdate, podsHaveReplicas, retryLater, clusterStateError :=
util.DeterminePodsSafeToUpdate(ctx, instance, int(*statefulSet.Spec.Replicas), outOfDatePods, hasReadyPod, availableUpdatedPodCount, updateLogger)
// If we do not have the clusterState, it's not safe to update pods that are running
if clusterStateError != nil {
retryLater = true
} else {
// We won't kill pods that we need the cluster state for, but we can kill the pods that are already not running.
// This is important for scenarios where there is a bad pod config and nothing is running, but we need to do
// a restart to get a working pod config.
state, retryLater, apiError := util.GetNodeReplicaState(ctx, instance, hasReadyPod, logger)
if apiError != nil {
return false, true, 0, apiError
} else if !retryLater {
// If the cluster status has been successfully fetched, then add the pods scheduled for deletion
// This requires the clusterState to be fetched successfully to ensure that we know if there
// are replicas living on the pod
podsToUpdate = append(podsToUpdate, outOfDatePods.ScheduledForDeletion...)
podsToUpdate = append(podsToUpdate, additionalPodsToUpdate...)

// Pick which pods should be deleted for an update.
var additionalPodsToUpdate []corev1.Pod
additionalPodsToUpdate, retryLater =
util.DeterminePodsSafeToUpdate(instance, int(*statefulSet.Spec.Replicas), outOfDatePods, state, availableUpdatedPodCount, updateLogger)
// If we do not have the clusterState, it's not safe to update pods that are running
if !retryLater {
podsToUpdate = append(podsToUpdate, additionalPodsToUpdate...)
}
}

// Only actually delete a running pod if it has been evicted, or doesn't need eviction (persistent storage)
for _, pod := range podsToUpdate {
retryLaterDurationTemp, inProgTmp, errTemp := DeletePodForUpdate(ctx, r, instance, &pod, podsHaveReplicas[pod.Name], updateLogger)
retryLaterDurationTemp, inProgTmp, errTemp := DeletePodForUpdate(ctx, r, instance, &pod, state.PodHasReplicas(instance, pod.Name), updateLogger)
requestInProgress = requestInProgress || inProgTmp

// Use the retryLaterDuration of the pod that requires a retry the soonest (smallest duration > 0)
Expand Down
1 change: 1 addition & 0 deletions controllers/solr_pod_lifecycle_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func DeletePodForUpdate(ctx context.Context, r *SolrCloudReconciler, instance *s

// Delete the pod
if deletePod {
logger.Error(err, "Deleting solr pod for update", "pod", pod.Name)
err = r.Delete(ctx, pod, client.Preconditions{
UID: &pod.UID,
})
Expand Down
176 changes: 105 additions & 71 deletions controllers/util/solr_update_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,59 @@ func (seg OutOfDatePodSegmentation) IsEmpty() bool {
return len(seg.NotStarted)+len(seg.ScheduledForDeletion)+len(seg.Running) == 0
}

type NodeReplicaState struct {
// A map of Solr node name (not pod name) to the contents of that Solr Node
NodeContents map[string]*SolrNodeContents
// A map of unique shard name (collection + shard) to the number of replicas for that shard
TotalShardReplicas map[string]int
// A map of unique shard name (collection + shard) to the number of non-active replicas for that shard
ShardReplicasNotActive map[string]int
// Whether all pods are live in the cluster state
AllManagedPodsLive bool
}

// PodContents is a helper method to get the node contents for a particular pod
func (state NodeReplicaState) PodContents(cloud *solr.SolrCloud, podName string) (contents *SolrNodeContents, isInClusterState bool) {
contents, isInClusterState = state.NodeContents[SolrNodeName(cloud, podName)]
return
}

// PodHasReplicas is a helper method to retrieve whether a pod has replicas living on it
func (state NodeReplicaState) PodHasReplicas(cloud *solr.SolrCloud, podName string) bool {
contents, isInClusterState := state.PodContents(cloud, podName)
return isInClusterState && contents.replicas > 0
}

func GetNodeReplicaState(ctx context.Context, cloud *solr.SolrCloud, hasReadyPod bool, logger logr.Logger) (state NodeReplicaState, retryLater bool, err error) {
clusterResp := &solr_api.SolrClusterStatusResponse{}
overseerResp := &solr_api.SolrOverseerStatusResponse{}

if hasReadyPod {
queryParams := url.Values{}
queryParams.Add("action", "CLUSTERSTATUS")
err = solr_api.CallCollectionsApi(ctx, cloud, queryParams, clusterResp)
if _, apiErr := solr_api.CheckForCollectionsApiError("CLUSTERSTATUS", clusterResp.ResponseHeader, clusterResp.Error); apiErr != nil {
err = apiErr
}
if err == nil {
queryParams.Set("action", "OVERSEERSTATUS")
err = solr_api.CallCollectionsApi(ctx, cloud, queryParams, overseerResp)
if _, apiErr := solr_api.CheckForCollectionsApiError("OVERSEERSTATUS", overseerResp.ResponseHeader, overseerResp.Error); apiErr != nil {
err = apiErr
}
}
if err == nil {
state = findSolrNodeContents(clusterResp.ClusterStatus, overseerResp.Leader, GetAllManagedSolrNodeNames(cloud))
} else {
logger.Error(err, "Could not fetch cluster state information for cloud")
}
} else {
retryLater = true
}

return
}

// DeterminePodsSafeToUpdate takes a list of solr Pods and returns a list of pods that are safe to upgrade now.
// This function MUST be idempotent and return the same list of pods given the same kubernetes/solr state.
//
Expand All @@ -105,50 +158,37 @@ func (seg OutOfDatePodSegmentation) IsEmpty() bool {
// TODO:
// - Think about caching this for ~250 ms? Not a huge need to send these requests milliseconds apart.
// - Might be too much complexity for very little gain.
func DeterminePodsSafeToUpdate(ctx context.Context, cloud *solr.SolrCloud, totalPods int, outOfDatePods OutOfDatePodSegmentation, hasReadyPod bool, availableUpdatedPodCount int, logger logr.Logger) (podsToUpdate []corev1.Pod, podsHaveReplicas map[string]bool, retryLater bool, err error) {
func DeterminePodsSafeToUpdate(cloud *solr.SolrCloud, totalPods int, outOfDatePods OutOfDatePodSegmentation, state NodeReplicaState, availableUpdatedPodCount int, logger logr.Logger) (podsToUpdate []corev1.Pod, retryLater bool) {
// Before fetching the cluster state, be sure that there is room to update at least 1 pod
maxPodsUnavailable, unavailableUpdatedPodCount, maxPodsToUpdate := calculateMaxPodsToUpdate(cloud, totalPods, len(outOfDatePods.Running), len(outOfDatePods.NotStarted)+len(outOfDatePods.ScheduledForDeletion), availableUpdatedPodCount)

if maxPodsToUpdate <= 0 {
logger.Info("Pod update selection canceled. The number of updated pods unavailable equals or exceeds the calculated maxPodsUnavailable.",
"unavailableUpdatedPods", unavailableUpdatedPodCount, "outOfDatePodsNotStarted", len(outOfDatePods.NotStarted), "alreadyScheduledForDeletion", len(outOfDatePods.ScheduledForDeletion), "maxPodsUnavailable", maxPodsUnavailable)
} else {
clusterResp := &solr_api.SolrClusterStatusResponse{}
overseerResp := &solr_api.SolrOverseerStatusResponse{}

if hasReadyPod {
queryParams := url.Values{}
queryParams.Add("action", "CLUSTERSTATUS")
err = solr_api.CallCollectionsApi(ctx, cloud, queryParams, clusterResp)
if err == nil {
queryParams.Set("action", "OVERSEERSTATUS")
err = solr_api.CallCollectionsApi(ctx, cloud, queryParams, overseerResp)
if _, apiErr := solr_api.CheckForCollectionsApiError("OVERSEERSTATUS", overseerResp.ResponseHeader, overseerResp.Error); apiErr != nil {
err = apiErr
}
}
if err != nil {
logger.Error(err, "Error retrieving cluster status, delaying pod update selection")
}
}
// If the update logic already wants to retry later, then do not pick any pods
if !retryLater {
logger.Info("Pod update selection started.",
"outOfDatePods", len(outOfDatePods.Running),
"maxPodsUnavailable", maxPodsUnavailable,
"unavailableUpdatedPods", unavailableUpdatedPodCount,
"outOfDatePodsNotStarted", len(outOfDatePods.NotStarted),
"alreadyScheduledForDeletion", len(outOfDatePods.ScheduledForDeletion),
"maxPodsToUpdate", maxPodsToUpdate)
podsToUpdate, podsHaveReplicas = pickPodsToUpdate(cloud, outOfDatePods, clusterResp.ClusterStatus, overseerResp.Leader, maxPodsToUpdate, logger)

// If there are no pods to upgrade, even though the maxPodsToUpdate is >0, then retry later because the issue stems from cluster state
// and clusterState changes will not call the reconciler.
if len(podsToUpdate) == 0 && len(outOfDatePods.Running) > 0 {
retryLater = true
}
logger.Info("Pod update selection not started. The number of unavailable pods unavailable (or scheduled for deletion) equals or exceeds the calculated maxPodsUnavailable.",
"outOfDatePods", len(outOfDatePods.Running),
"maxPodsUnavailable", maxPodsUnavailable,
"unavailableUpdatedPods", unavailableUpdatedPodCount,
"outOfDatePodsNotStarted", len(outOfDatePods.NotStarted),
"alreadyScheduledForDeletion", len(outOfDatePods.ScheduledForDeletion))
retryLater = true
}
// If the update logic already wants to retry later, then do not pick any pods
if !retryLater {
logger.Info("Pod update selection started.",
"outOfDatePods", len(outOfDatePods.Running),
"maxPodsUnavailable", maxPodsUnavailable,
"unavailableUpdatedPods", unavailableUpdatedPodCount,
"outOfDatePodsNotStarted", len(outOfDatePods.NotStarted),
"alreadyScheduledForDeletion", len(outOfDatePods.ScheduledForDeletion),
"maxPodsToUpdate", maxPodsToUpdate)
podsToUpdate = pickPodsToUpdate(cloud, outOfDatePods, state, maxPodsToUpdate, logger)

// If there are no pods to upgrade, even though the maxPodsToUpdate is >0, then retry later because the issue stems from cluster state
// and clusterState changes will not call the reconciler.
if len(podsToUpdate) == 0 && len(outOfDatePods.Running) > 0 {
retryLater = true
}
}
return podsToUpdate, podsHaveReplicas, retryLater, err
return podsToUpdate, retryLater
}

// calculateMaxPodsToUpdate determines the maximum number of additional pods that can be updated.
Expand All @@ -164,38 +204,32 @@ func calculateMaxPodsToUpdate(cloud *solr.SolrCloud, totalPods int, outOfDatePod
return maxPodsUnavailable, unavailableUpdatedPodCount, maxPodsToUpdate
}

func pickPodsToUpdate(cloud *solr.SolrCloud, outOfDatePods OutOfDatePodSegmentation, clusterStatus solr_api.SolrClusterStatus,
overseer string, maxPodsToUpdate int, logger logr.Logger) (podsToUpdate []corev1.Pod, podsHaveReplicas map[string]bool) {
podsHaveReplicas = make(map[string]bool, maxPodsToUpdate)
nodeContents, totalShardReplicas, shardReplicasNotActive, allManagedPodsLive := findSolrNodeContents(clusterStatus, overseer, GetAllManagedSolrNodeNames(cloud))
sortNodePodsBySafety(outOfDatePods.Running, nodeContents, cloud)
func pickPodsToUpdate(cloud *solr.SolrCloud, outOfDatePods OutOfDatePodSegmentation, state NodeReplicaState, maxPodsToUpdate int, logger logr.Logger) (podsToUpdate []corev1.Pod) {
sortNodePodsBySafety(outOfDatePods.Running, state.NodeContents, cloud)

updateOptions := cloud.Spec.UpdateStrategy.ManagedUpdateOptions
var maxShardReplicasUnavailableCache map[string]int
// In case the user wants all shardReplicas to be unavailable at the same time, populate the cache with the total number of replicas per shard.
if updateOptions.MaxShardReplicasUnavailable != nil && updateOptions.MaxShardReplicasUnavailable.Type == intstr.Int && updateOptions.MaxShardReplicasUnavailable.IntVal <= int32(0) {
maxShardReplicasUnavailableCache = totalShardReplicas
maxShardReplicasUnavailableCache = state.TotalShardReplicas
} else {
maxShardReplicasUnavailableCache = make(map[string]int, len(totalShardReplicas))
maxShardReplicasUnavailableCache = make(map[string]int, len(state.TotalShardReplicas))
}

for _, pod := range outOfDatePods.ScheduledForDeletion {
nodeName := SolrNodeName(cloud, pod.Name)
nodeContent, isInClusterState := nodeContents[nodeName]
nodeContent, isInClusterState := state.PodContents(cloud, pod.Name)

// This pod will be deleted, add its information to future down shards
podsHaveReplicas[pod.Name] = isInClusterState && nodeContent.replicas > 0
if isInClusterState && nodeContent.live {
for shard, additionalReplicaCount := range nodeContent.activeReplicasPerShard {
shardReplicasNotActive[shard] += additionalReplicaCount
state.ShardReplicasNotActive[shard] += additionalReplicaCount
}
}
}

for _, pod := range outOfDatePods.Running {
isSafeToUpdate := true
nodeName := SolrNodeName(cloud, pod.Name)
nodeContent, isInClusterState := nodeContents[nodeName]
nodeContent, isInClusterState := state.PodContents(cloud, pod.Name)
var reason string
// The overseerLeader can only be upgraded by itself
if !isInClusterState || !nodeContent.InClusterState() {
Expand All @@ -210,7 +244,7 @@ func pickPodsToUpdate(cloud *solr.SolrCloud, outOfDatePods OutOfDatePodSegmentat
// But we want to make sure it still follows the same replicasDown rules as the other nodes, so still use that logic
// This works if there are other solr nodes not managed by this SolrCloud resource, because we just check that this is the last
// pod managed for this SolrCloud that has not been updated.
if len(outOfDatePods.Running) == 1 && allManagedPodsLive {
if len(outOfDatePods.Running) == 1 && state.AllManagedPodsLive {
isSafeToUpdate = true
reason = "Pod is overseer and all other nodes have been updated."
} else {
Expand All @@ -231,10 +265,10 @@ func pickPodsToUpdate(cloud *solr.SolrCloud, outOfDatePods OutOfDatePodSegmentat
continue
}

notActiveReplicaCount, _ := shardReplicasNotActive[shard]
notActiveReplicaCount, _ := state.ShardReplicasNotActive[shard]

// If the maxBatchNodeUpgradeSpec is passed as a decimal between 0 and 1, then calculate as a percentage of the number of nodes
maxShardReplicasDown, _ := ResolveMaxShardReplicasUnavailable(updateOptions.MaxShardReplicasUnavailable, shard, totalShardReplicas, maxShardReplicasUnavailableCache)
maxShardReplicasDown, _ := ResolveMaxShardReplicasUnavailable(updateOptions.MaxShardReplicasUnavailable, shard, state.TotalShardReplicas, maxShardReplicasUnavailableCache)

// We have to allow killing of Pods that have multiple replicas of a shard
// Therefore only check the additional Replica count if some replicas of that shard are already being upgraded
Expand All @@ -257,12 +291,11 @@ func pickPodsToUpdate(cloud *solr.SolrCloud, outOfDatePods OutOfDatePodSegmentat
// If the node is not "live", then the replicas on that node will have already been counted as "not active".
if isInClusterState && nodeContent.live {
for shard, additionalReplicaCount := range nodeContent.activeReplicasPerShard {
shardReplicasNotActive[shard] += additionalReplicaCount
state.ShardReplicasNotActive[shard] += additionalReplicaCount
}
}
logger.Info("Pod selected to be deleted for update.", "pod", pod.Name, "reason", reason)
podsToUpdate = append(podsToUpdate, pod)
podsHaveReplicas[pod.Name] = isInClusterState && nodeContent.replicas > 0

// Stop after the maxBatchNodeUpdate count, if one is provided.
if maxPodsToUpdate >= 1 && len(podsToUpdate) >= maxPodsToUpdate {
Expand All @@ -273,7 +306,7 @@ func pickPodsToUpdate(cloud *solr.SolrCloud, outOfDatePods OutOfDatePodSegmentat
logger.Info("Pod not able to be killed for update.", "pod", pod.Name, "reason", reason)
}
}
return podsToUpdate, podsHaveReplicas
return podsToUpdate
}

func sortNodePodsBySafety(outOfDatePods []corev1.Pod, nodeMap map[string]*SolrNodeContents, solrCloud *solr.SolrCloud) {
Expand Down Expand Up @@ -364,13 +397,13 @@ This aggregated info is returned as:
- A map from unique shard name (collection+shard) to the count of replicas that are not active for that shard.
- If a node is not live, then all shards that live on that node will be considered "not active"
*/
func findSolrNodeContents(cluster solr_api.SolrClusterStatus, overseerLeader string, managedSolrNodeNames map[string]bool) (nodeContents map[string]*SolrNodeContents, totalShardReplicas map[string]int, shardReplicasNotActive map[string]int, allManagedPodsLive bool) {
nodeContents = make(map[string]*SolrNodeContents, 0)
totalShardReplicas = make(map[string]int, 0)
shardReplicasNotActive = make(map[string]int, 0)
func findSolrNodeContents(cluster solr_api.SolrClusterStatus, overseerLeader string, managedSolrNodeNames map[string]bool) (state NodeReplicaState) {
state.NodeContents = make(map[string]*SolrNodeContents, 0)
state.TotalShardReplicas = make(map[string]int, 0)
state.ShardReplicasNotActive = make(map[string]int, 0)
// Update the info for each "live" node.
for _, nodeName := range cluster.LiveNodes {
contents, hasValue := nodeContents[nodeName]
contents, hasValue := state.NodeContents[nodeName]
delete(managedSolrNodeNames, nodeName)
if !hasValue {
contents = &SolrNodeContents{
Expand All @@ -386,15 +419,15 @@ func findSolrNodeContents(cluster solr_api.SolrClusterStatus, overseerLeader str
} else {
contents.live = true
}
nodeContents[nodeName] = contents
state.NodeContents[nodeName] = contents
}
// Go through the state of each collection getting the count of replicas for each collection/shard living on each node
for collectionName, collection := range cluster.Collections {
for shardName, shard := range collection.Shards {
uniqueShard := collectionName + "|" + shardName
totalShardReplicas[uniqueShard] = len(shard.Replicas)
state.TotalShardReplicas[uniqueShard] = len(shard.Replicas)
for _, replica := range shard.Replicas {
contents, hasValue := nodeContents[replica.NodeName]
contents, hasValue := state.NodeContents[replica.NodeName]
if !hasValue {
contents = &SolrNodeContents{
nodeName: replica.NodeName,
Expand All @@ -415,7 +448,7 @@ func findSolrNodeContents(cluster solr_api.SolrClusterStatus, overseerLeader str

// A replica can be considered "not active" if it's state is not "active" or the node it lives in is not "live".
if !(replica.State == solr_api.ReplicaActive && contents.live) {
shardReplicasNotActive[uniqueShard] += 1
state.ShardReplicasNotActive[uniqueShard] += 1
}
if replica.State == solr_api.ReplicaActive {
contents.activeReplicasPerShard[uniqueShard] += 1
Expand All @@ -427,13 +460,13 @@ func findSolrNodeContents(cluster solr_api.SolrClusterStatus, overseerLeader str
contents.notDownReplicas += 1
}

nodeContents[replica.NodeName] = contents
state.NodeContents[replica.NodeName] = contents
}
}
}
// Update the info for the overseerLeader leader.
if overseerLeader != "" {
contents, hasValue := nodeContents[overseerLeader]
contents, hasValue := state.NodeContents[overseerLeader]
if !hasValue {
contents = &SolrNodeContents{
nodeName: overseerLeader,
Expand All @@ -447,9 +480,10 @@ func findSolrNodeContents(cluster solr_api.SolrClusterStatus, overseerLeader str
} else {
contents.overseerLeader = true
}
nodeContents[overseerLeader] = contents
state.NodeContents[overseerLeader] = contents
}
return nodeContents, totalShardReplicas, shardReplicasNotActive, len(managedSolrNodeNames) == 0
state.AllManagedPodsLive = len(managedSolrNodeNames) == 0
return state
}

type SolrNodeContents struct {
Expand Down
Loading

0 comments on commit b49282d

Please sign in to comment.