From 79686a4b2ec64963666d7c14c36e535a158ca3f3 Mon Sep 17 00:00:00 2001 From: AiRanthem Date: Wed, 25 Sep 2024 11:49:44 +0800 Subject: [PATCH] refactor subset adapter Signed-off-by: AiRanthem --- .../uniteddeployment/adapter/adapter.go | 13 +++-- .../uniteddeployment/adapter/adapter_util.go | 17 +++++++ .../adapter/advanced_statefulset_adapter.go | 32 +++++++------ .../adapter/cloneset_adapter.go | 33 ++++++------- .../adapter/deployment_adapter.go | 32 +++++++------ .../adapter/statefulset_adapter.go | 48 +++++++------------ pkg/controller/uniteddeployment/allocator.go | 4 +- .../uniteddeployment/subset_control.go | 33 ++++++------- .../uniteddeployment_controller.go | 18 +++---- pkg/controller/util/pod_condition_utils.go | 4 +- pkg/controller/workloadspread/reschedule.go | 2 +- 11 files changed, 127 insertions(+), 109 deletions(-) diff --git a/pkg/controller/uniteddeployment/adapter/adapter.go b/pkg/controller/uniteddeployment/adapter/adapter.go index e2d66c5bc4..12e3a44207 100644 --- a/pkg/controller/uniteddeployment/adapter/adapter.go +++ b/pkg/controller/uniteddeployment/adapter/adapter.go @@ -32,9 +32,16 @@ type Adapter interface { NewResourceListObject() client.ObjectList // GetStatusObservedGeneration returns the observed generation of the subset. GetStatusObservedGeneration(subset metav1.Object) int64 - // GetReplicaDetails returns the replicas information of the subset status. - GetReplicaDetails(subset metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, - statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) + // GetSubsetPods returns all pods of the subset workload. + GetSubsetPods(obj metav1.Object) ([]*corev1.Pod, error) + // GetSpecReplicas returns the replicas information of the subset workload. + GetSpecReplicas(obj metav1.Object) *int32 + // GetSpecPartition returns the partition information of the subset workload if possible. + GetSpecPartition(obj metav1.Object, pods []*corev1.Pod) *int32 + // GetStatusReplicas returns the replicas from the subset workload status. + GetStatusReplicas(obj metav1.Object) int32 + // GetStatusReadyReplicas returns the ready replicas information from the subset workload status. + GetStatusReadyReplicas(obj metav1.Object) int32 // GetSubsetFailure returns failure information of the subset. GetSubsetFailure() *string // ApplySubsetTemplate updates the subset to the latest revision. diff --git a/pkg/controller/uniteddeployment/adapter/adapter_util.go b/pkg/controller/uniteddeployment/adapter/adapter_util.go index 3f761a02a1..30b4faefd2 100644 --- a/pkg/controller/uniteddeployment/adapter/adapter_util.go +++ b/pkg/controller/uniteddeployment/adapter/adapter_util.go @@ -22,6 +22,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/validation" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" ) @@ -97,3 +98,19 @@ func getCurrentPartition(pods []*corev1.Pod, revision string) *int32 { return &partition } + +func CalculateUpdatedReplicas(podList []*corev1.Pod, updatedRevision string) (updatedReplicas, updatedReadyReplicas int32) { + for _, pod := range podList { + revision := getRevision(&pod.ObjectMeta) + + // Only count pods that are updated and are not terminating + if revision == updatedRevision && pod.GetDeletionTimestamp() == nil { + updatedReplicas++ + if podutil.IsPodReady(pod) { + updatedReadyReplicas++ + } + } + } + + return +} diff --git a/pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go b/pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go index 24cd0b1d03..6cc42f4cfb 100644 --- a/pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go +++ b/pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go @@ -62,28 +62,32 @@ func (a *AdvancedStatefulSetAdapter) GetStatusObservedGeneration(obj metav1.Obje return obj.(*v1beta1.StatefulSet).Status.ObservedGeneration } -// GetReplicaDetails returns the replicas detail the subset needs. -func (a *AdvancedStatefulSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) { - set := obj.(*v1beta1.StatefulSet) - pods, err = a.getStatefulSetPods(set) - if err != nil { - return - } +func (a *AdvancedStatefulSetAdapter) GetSubsetPods(obj metav1.Object) ([]*corev1.Pod, error) { + return a.getStatefulSetPods(obj.(*v1beta1.StatefulSet)) +} + +func (a *AdvancedStatefulSetAdapter) GetSpecReplicas(obj metav1.Object) *int32 { + return obj.(*v1beta1.StatefulSet).Spec.Replicas +} - specReplicas = set.Spec.Replicas +func (a *AdvancedStatefulSetAdapter) GetSpecPartition(obj metav1.Object, pods []*corev1.Pod) *int32 { + set := obj.(*v1beta1.StatefulSet) if set.Spec.UpdateStrategy.Type == appsv1.OnDeleteStatefulSetStrategyType { revision := getRevision(&set.ObjectMeta) - specPartition = getCurrentPartition(pods, revision) + return getCurrentPartition(pods, revision) } else if set.Spec.UpdateStrategy.RollingUpdate != nil && set.Spec.UpdateStrategy.RollingUpdate.Partition != nil { - specPartition = set.Spec.UpdateStrategy.RollingUpdate.Partition + return set.Spec.UpdateStrategy.RollingUpdate.Partition } + return nil +} - statusReplicas = set.Status.Replicas - statusReadyReplicas = set.Status.ReadyReplicas - statusUpdatedReplicas, statusUpdatedReadyReplicas = calculateUpdatedReplicas(pods, updatedRevision) +func (a *AdvancedStatefulSetAdapter) GetStatusReplicas(obj metav1.Object) int32 { + return obj.(*v1beta1.StatefulSet).Status.Replicas +} - return +func (a *AdvancedStatefulSetAdapter) GetStatusReadyReplicas(obj metav1.Object) int32 { + return obj.(*v1beta1.StatefulSet).Status.ReadyReplicas } // GetSubsetFailure returns the failure information of the subset. diff --git a/pkg/controller/uniteddeployment/adapter/cloneset_adapter.go b/pkg/controller/uniteddeployment/adapter/cloneset_adapter.go index a570d2b1cc..7b25f4bd21 100644 --- a/pkg/controller/uniteddeployment/adapter/cloneset_adapter.go +++ b/pkg/controller/uniteddeployment/adapter/cloneset_adapter.go @@ -41,28 +41,29 @@ func (a *CloneSetAdapter) GetStatusObservedGeneration(obj metav1.Object) int64 { return obj.(*alpha1.CloneSet).Status.ObservedGeneration } -func (a *CloneSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) { - - set := obj.(*alpha1.CloneSet) - - pods, err = a.getCloneSetPods(set) - - if err != nil { - return - } +func (a *CloneSetAdapter) GetSubsetPods(obj metav1.Object) ([]*corev1.Pod, error) { + return a.getCloneSetPods(obj.(*alpha1.CloneSet)) +} - specReplicas = set.Spec.Replicas +func (a *CloneSetAdapter) GetSpecReplicas(obj metav1.Object) *int32 { + return obj.(*alpha1.CloneSet).Spec.Replicas +} +func (a *CloneSetAdapter) GetSpecPartition(obj metav1.Object, _ []*corev1.Pod) *int32 { + set := obj.(*alpha1.CloneSet) if set.Spec.UpdateStrategy.Partition != nil { - partition, _ := intstr.GetValueFromIntOrPercent(set.Spec.UpdateStrategy.Partition, int(*set.Spec.Replicas), true) - specPartition = utilpointer.Int32Ptr(int32(partition)) + partition, _ := intstr.GetScaledValueFromIntOrPercent(set.Spec.UpdateStrategy.Partition, int(*set.Spec.Replicas), true) + return utilpointer.Int32Ptr(int32(partition)) } + return nil +} - statusReplicas = set.Status.Replicas - statusReadyReplicas = set.Status.ReadyReplicas - statusUpdatedReplicas, statusUpdatedReadyReplicas = calculateUpdatedReplicas(pods, updatedRevision) +func (a *CloneSetAdapter) GetStatusReplicas(obj metav1.Object) int32 { + return obj.(*alpha1.CloneSet).Status.Replicas +} - return +func (a *CloneSetAdapter) GetStatusReadyReplicas(obj metav1.Object) int32 { + return obj.(*alpha1.CloneSet).Status.ReadyReplicas } func (a *CloneSetAdapter) GetSubsetFailure() *string { diff --git a/pkg/controller/uniteddeployment/adapter/deployment_adapter.go b/pkg/controller/uniteddeployment/adapter/deployment_adapter.go index 1ed391c5c8..e1fe7024dc 100644 --- a/pkg/controller/uniteddeployment/adapter/deployment_adapter.go +++ b/pkg/controller/uniteddeployment/adapter/deployment_adapter.go @@ -57,24 +57,28 @@ func (a *DeploymentAdapter) GetStatusObservedGeneration(obj metav1.Object) int64 return obj.(*appsv1.Deployment).Status.ObservedGeneration } -// GetReplicaDetails returns the replicas detail the subset needs. -func (a *DeploymentAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) { - // Convert to Deployment Object +func (a *DeploymentAdapter) GetSubsetPods(obj metav1.Object) ([]*corev1.Pod, error) { set := obj.(*appsv1.Deployment) + return a.getDeploymentPods(set) +} - // Get all pods belonging to deployment - pods, err = a.getDeploymentPods(set) - if err != nil { - return - } +func (a *DeploymentAdapter) GetSpecReplicas(obj metav1.Object) *int32 { + set := obj.(*appsv1.Deployment) + return set.Spec.Replicas +} - // Set according replica counts - specReplicas = set.Spec.Replicas - statusReplicas = set.Status.Replicas - statusReadyReplicas = set.Status.ReadyReplicas - statusUpdatedReplicas, statusUpdatedReadyReplicas = calculateUpdatedReplicas(pods, updatedRevision) +func (a *DeploymentAdapter) GetSpecPartition(obj metav1.Object, pods []*corev1.Pod) *int32 { + return nil +} + +func (a *DeploymentAdapter) GetStatusReplicas(obj metav1.Object) int32 { + set := obj.(*appsv1.Deployment) + return set.Status.Replicas +} - return +func (a *DeploymentAdapter) GetStatusReadyReplicas(obj metav1.Object) int32 { + set := obj.(*appsv1.Deployment) + return set.Status.ReadyReplicas } // GetSubsetFailure returns the failure information of the subset. diff --git a/pkg/controller/uniteddeployment/adapter/statefulset_adapter.go b/pkg/controller/uniteddeployment/adapter/statefulset_adapter.go index eb4761d862..1952af5bb5 100644 --- a/pkg/controller/uniteddeployment/adapter/statefulset_adapter.go +++ b/pkg/controller/uniteddeployment/adapter/statefulset_adapter.go @@ -58,28 +58,32 @@ func (a *StatefulSetAdapter) GetStatusObservedGeneration(obj metav1.Object) int6 return obj.(*appsv1.StatefulSet).Status.ObservedGeneration } -// GetReplicaDetails returns the replicas detail the subset needs. -func (a *StatefulSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) { - set := obj.(*appsv1.StatefulSet) - pods, err = a.getStatefulSetPods(set) - if err != nil { - return - } +func (a *StatefulSetAdapter) GetSubsetPods(obj metav1.Object) ([]*corev1.Pod, error) { + return a.getStatefulSetPods(obj.(*appsv1.StatefulSet)) +} - specReplicas = set.Spec.Replicas +func (a *StatefulSetAdapter) GetSpecReplicas(obj metav1.Object) *int32 { + return obj.(*appsv1.StatefulSet).Spec.Replicas +} + +func (a *StatefulSetAdapter) GetSpecPartition(obj metav1.Object, pods []*corev1.Pod) *int32 { + set := obj.(*appsv1.StatefulSet) if set.Spec.UpdateStrategy.Type == appsv1.OnDeleteStatefulSetStrategyType { revision := getRevision(&set.ObjectMeta) - specPartition = getCurrentPartition(pods, revision) + return getCurrentPartition(pods, revision) } else if set.Spec.UpdateStrategy.RollingUpdate != nil && set.Spec.UpdateStrategy.RollingUpdate.Partition != nil { - specPartition = set.Spec.UpdateStrategy.RollingUpdate.Partition + return set.Spec.UpdateStrategy.RollingUpdate.Partition } + return nil +} - statusReplicas = set.Status.Replicas - statusReadyReplicas = set.Status.ReadyReplicas - statusUpdatedReplicas, statusUpdatedReadyReplicas = calculateUpdatedReplicas(pods, updatedRevision) +func (a *StatefulSetAdapter) GetStatusReplicas(obj metav1.Object) int32 { + return obj.(*appsv1.StatefulSet).Status.Replicas +} - return +func (a *StatefulSetAdapter) GetStatusReadyReplicas(obj metav1.Object) int32 { + return obj.(*appsv1.StatefulSet).Status.ReadyReplicas } // GetSubsetFailure returns the failure information of the subset. @@ -231,22 +235,6 @@ func (a *StatefulSetAdapter) getStatefulSetPods(set *appsv1.StatefulSet) ([]*cor return claimedPods, nil } -func calculateUpdatedReplicas(podList []*corev1.Pod, updatedRevision string) (updatedReplicas, updatedReadyReplicas int32) { - for _, pod := range podList { - revision := getRevision(&pod.ObjectMeta) - - // Only count pods that are updated and are not terminating - if revision == updatedRevision && pod.GetDeletionTimestamp() == nil { - updatedReplicas++ - if podutil.IsPodReady(pod) { - updatedReadyReplicas++ - } - } - } - - return -} - // deleteStuckPods tries to work around the blocking issue https://github.com/kubernetes/kubernetes/issues/67250 func (a *StatefulSetAdapter) deleteStuckPods(set *appsv1.StatefulSet, revision string, partition int32) error { pods, err := a.getStatefulSetPods(set) diff --git a/pkg/controller/uniteddeployment/allocator.go b/pkg/controller/uniteddeployment/allocator.go index 9b19fc42fc..8c0192dfa3 100644 --- a/pkg/controller/uniteddeployment/allocator.go +++ b/pkg/controller/uniteddeployment/allocator.go @@ -83,7 +83,7 @@ func getNotPendingReplicasMap(nameToSubset *map[string]*Subset) map[string]int32 return result } -func getSubSetUnschedulable(name string, nameToSubset *map[string]*Subset) (unschedulable bool) { +func isSubSetUnschedulable(name string, nameToSubset *map[string]*Subset) (unschedulable bool) { if subsetObj, ok := (*nameToSubset)[name]; ok { unschedulable = subsetObj.Status.UnschedulableStatus.Unschedulable } else { @@ -303,7 +303,7 @@ func (ac *elasticAllocator) validateAndCalculateMinMaxMap(replicas int32, nameTo maxReplicas, _ = ParseSubsetReplicas(replicas, *subset.MaxReplicas) } if ac.Spec.Topology.ScheduleStrategy.IsAdaptive() { - unschedulable := getSubSetUnschedulable(subset.Name, nameToSubset) + unschedulable := isSubSetUnschedulable(subset.Name, nameToSubset) // This means that in the Adaptive scheduling strategy, an unschedulable subset can only be scaled down, not scaled up. if notPendingReplicas, ok := notPendingReplicasMap[subset.Name]; unschedulable && ok { klog.InfoS("Assign min(notPendingReplicas, minReplicas/maxReplicas) for unschedulable subset", diff --git a/pkg/controller/uniteddeployment/subset_control.go b/pkg/controller/uniteddeployment/subset_control.go index 8a85a869e6..c8945359ac 100644 --- a/pkg/controller/uniteddeployment/subset_control.go +++ b/pkg/controller/uniteddeployment/subset_control.go @@ -132,11 +132,6 @@ func (m *SubsetControl) IsExpected(subSet *Subset, revision string) bool { } func (m *SubsetControl) convertToSubset(set metav1.Object, updatedRevision string) (*Subset, error) { - subSetName, err := getSubsetNameFrom(set) - if err != nil { - return nil, err - } - subset := &Subset{} subset.ObjectMeta = metav1.ObjectMeta{ Name: set.GetName(), @@ -154,30 +149,32 @@ func (m *SubsetControl) convertToSubset(set metav1.Object, updatedRevision strin OwnerReferences: set.GetOwnerReferences(), Finalizers: set.GetFinalizers(), } - subset.Spec.SubsetName = subSetName - specReplicas, specPartition, statusReplicas, - statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas, pods, err := m.adapter.GetReplicaDetails(set, updatedRevision) + pods, err := m.adapter.GetSubsetPods(set) + if err != nil { + return nil, err + } + subset.Spec.SubsetPods = pods + + subSetName, err := getSubsetNameFrom(set) if err != nil { - return subset, err + return nil, err } + subset.Spec.SubsetName = subSetName - if specReplicas != nil { + if specReplicas := m.adapter.GetSpecReplicas(set); specReplicas != nil { subset.Spec.Replicas = *specReplicas } - if specPartition != nil { + if specPartition := m.adapter.GetSpecPartition(set, pods); specPartition != nil { subset.Spec.UpdateStrategy.Partition = *specPartition } + subset.Spec.SubsetRef.Resources = append(subset.Spec.SubsetRef.Resources, set) subset.Status.ObservedGeneration = m.adapter.GetStatusObservedGeneration(set) - subset.Status.Replicas = statusReplicas - subset.Status.ReadyReplicas = statusReadyReplicas - subset.Status.UpdatedReplicas = statusUpdatedReplicas - subset.Status.UpdatedReadyReplicas = statusUpdatedReadyReplicas - - subset.Spec.SubsetRef.Resources = append(subset.Spec.SubsetRef.Resources, set) - subset.Spec.SubsetPods = pods + subset.Status.Replicas = m.adapter.GetStatusReplicas(set) + subset.Status.ReadyReplicas = m.adapter.GetStatusReadyReplicas(set) + subset.Status.UpdatedReplicas, subset.Status.UpdatedReadyReplicas = adapter.CalculateUpdatedReplicas(pods, updatedRevision) return subset, nil } diff --git a/pkg/controller/uniteddeployment/uniteddeployment_controller.go b/pkg/controller/uniteddeployment/uniteddeployment_controller.go index 513b97c2e9..50f4abbc2c 100644 --- a/pkg/controller/uniteddeployment/uniteddeployment_controller.go +++ b/pkg/controller/uniteddeployment/uniteddeployment_controller.go @@ -184,14 +184,6 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci } return reconcile.Result{}, err } - if instance.DeletionTimestamp == nil && !controllerutil.ContainsFinalizer(instance, UnitedDeploymentFinalizer) { - klog.InfoS("adding UnitedDeploymentFinalizer") - controllerutil.AddFinalizer(instance, UnitedDeploymentFinalizer) - if err = r.updateUnitedDeploymentInstance(instance); err != nil { - klog.ErrorS(err, "Failed to add UnitedDeploymentFinalizer", "unitedDeployment", request) - } - return reconcile.Result{}, err - } if instance.DeletionTimestamp != nil { if controllerutil.RemoveFinalizer(instance, UnitedDeploymentFinalizer) { @@ -205,6 +197,14 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci return reconcile.Result{}, err } + if controllerutil.AddFinalizer(instance, UnitedDeploymentFinalizer) { + klog.InfoS("adding UnitedDeploymentFinalizer") + if err = r.updateUnitedDeploymentInstance(instance); err != nil { + klog.ErrorS(err, "Failed to add UnitedDeploymentFinalizer", "unitedDeployment", request) + } + return reconcile.Result{}, err + } + // make sure latest version is observed ResourceVersionExpectation.Observe(instance) if satisfied, _ := ResourceVersionExpectation.IsSatisfied(instance); !satisfied { @@ -339,7 +339,7 @@ func manageUnschedulableStatusForExistingSubset(name string, subset *Subset, ud // Maybe there exist some pending pods because the subset is unschedulable. if subset.Status.ReadyReplicas < subset.Status.Replicas { for _, pod := range subset.Spec.SubsetPods { - timeouted, checkAfter := utilcontroller.PodPendingTimeout(pod, ud.Spec.Topology.ScheduleStrategy.GetRescheduleCriticalDuration()) + timeouted, checkAfter := utilcontroller.GetTimeBeforePendingTimeout(pod, ud.Spec.Topology.ScheduleStrategy.GetRescheduleCriticalDuration()) if timeouted { subset.Status.UnschedulableStatus.PendingPods++ } diff --git a/pkg/controller/util/pod_condition_utils.go b/pkg/controller/util/pod_condition_utils.go index ad5ff48ae2..c2a69d339e 100644 --- a/pkg/controller/util/pod_condition_utils.go +++ b/pkg/controller/util/pod_condition_utils.go @@ -24,9 +24,9 @@ func UpdateMessageKvCondition(kv map[string]interface{}, condition *v1.PodCondit condition.Message = string(message) } -// PodPendingTimeout return true when Pod was scheduled failed and timeout. +// GetTimeBeforePendingTimeout return true when Pod was scheduled failed and timeout. // nextCheckAfter > 0 means the pod is failed to schedule but not timeout yet. -func PodPendingTimeout(pod *v1.Pod, timeout time.Duration) (timeouted bool, nextCheckAfter time.Duration) { +func GetTimeBeforePendingTimeout(pod *v1.Pod, timeout time.Duration) (timeouted bool, nextCheckAfter time.Duration) { if pod.DeletionTimestamp != nil || pod.Status.Phase != v1.PodPending || pod.Spec.NodeName != "" { return false, -1 } diff --git a/pkg/controller/workloadspread/reschedule.go b/pkg/controller/workloadspread/reschedule.go index 7ac8c30d97..a429053676 100644 --- a/pkg/controller/workloadspread/reschedule.go +++ b/pkg/controller/workloadspread/reschedule.go @@ -112,7 +112,7 @@ func (r *ReconcileWorkloadSpread) deletePodsForSubset(ws *appsv1alpha1.WorkloadS // PodUnscheduledTimeout return true when Pod was scheduled failed and timeout. func PodUnscheduledTimeout(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod) bool { - timeouted, nextCheckAfter := util.PodPendingTimeout(pod, time.Second*time.Duration(*ws.Spec.ScheduleStrategy.Adaptive.RescheduleCriticalSeconds)) + timeouted, nextCheckAfter := util.GetTimeBeforePendingTimeout(pod, time.Second*time.Duration(*ws.Spec.ScheduleStrategy.Adaptive.RescheduleCriticalSeconds)) if nextCheckAfter > 0 { durationStore.Push(getWorkloadSpreadKey(ws), nextCheckAfter) }