Skip to content

Commit

Permalink
refactor subset adapter
Browse files Browse the repository at this point in the history
Signed-off-by: AiRanthem <[email protected]>
  • Loading branch information
AiRanthem committed Sep 25, 2024
1 parent 300e67c commit 79686a4
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 109 deletions.
13 changes: 10 additions & 3 deletions pkg/controller/uniteddeployment/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,16 @@ type Adapter interface {
NewResourceListObject() client.ObjectList
// GetStatusObservedGeneration returns the observed generation of the subset.
GetStatusObservedGeneration(subset metav1.Object) int64
// GetReplicaDetails returns the replicas information of the subset status.
GetReplicaDetails(subset metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas,
statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error)
// GetSubsetPods returns all pods of the subset workload.
GetSubsetPods(obj metav1.Object) ([]*corev1.Pod, error)
// GetSpecReplicas returns the replicas information of the subset workload.
GetSpecReplicas(obj metav1.Object) *int32
// GetSpecPartition returns the partition information of the subset workload if possible.
GetSpecPartition(obj metav1.Object, pods []*corev1.Pod) *int32
// GetStatusReplicas returns the replicas from the subset workload status.
GetStatusReplicas(obj metav1.Object) int32
// GetStatusReadyReplicas returns the ready replicas information from the subset workload status.
GetStatusReadyReplicas(obj metav1.Object) int32
// GetSubsetFailure returns failure information of the subset.
GetSubsetFailure() *string
// ApplySubsetTemplate updates the subset to the latest revision.
Expand Down
17 changes: 17 additions & 0 deletions pkg/controller/uniteddeployment/adapter/adapter_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/validation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
)
Expand Down Expand Up @@ -97,3 +98,19 @@ func getCurrentPartition(pods []*corev1.Pod, revision string) *int32 {

return &partition
}

func CalculateUpdatedReplicas(podList []*corev1.Pod, updatedRevision string) (updatedReplicas, updatedReadyReplicas int32) {
for _, pod := range podList {
revision := getRevision(&pod.ObjectMeta)

Check warning on line 104 in pkg/controller/uniteddeployment/adapter/adapter_util.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/adapter_util.go#L102-L104

Added lines #L102 - L104 were not covered by tests

// Only count pods that are updated and are not terminating
if revision == updatedRevision && pod.GetDeletionTimestamp() == nil {
updatedReplicas++
if podutil.IsPodReady(pod) {
updatedReadyReplicas++

Check warning on line 110 in pkg/controller/uniteddeployment/adapter/adapter_util.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/adapter_util.go#L107-L110

Added lines #L107 - L110 were not covered by tests
}
}
}

return

Check warning on line 115 in pkg/controller/uniteddeployment/adapter/adapter_util.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/adapter_util.go#L115

Added line #L115 was not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,28 +62,32 @@ func (a *AdvancedStatefulSetAdapter) GetStatusObservedGeneration(obj metav1.Obje
return obj.(*v1beta1.StatefulSet).Status.ObservedGeneration
}

// GetReplicaDetails returns the replicas detail the subset needs.
func (a *AdvancedStatefulSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) {
set := obj.(*v1beta1.StatefulSet)
pods, err = a.getStatefulSetPods(set)
if err != nil {
return
}
func (a *AdvancedStatefulSetAdapter) GetSubsetPods(obj metav1.Object) ([]*corev1.Pod, error) {
return a.getStatefulSetPods(obj.(*v1beta1.StatefulSet))

Check warning on line 66 in pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go#L65-L66

Added lines #L65 - L66 were not covered by tests
}

func (a *AdvancedStatefulSetAdapter) GetSpecReplicas(obj metav1.Object) *int32 {
return obj.(*v1beta1.StatefulSet).Spec.Replicas

Check warning on line 70 in pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go#L69-L70

Added lines #L69 - L70 were not covered by tests
}

specReplicas = set.Spec.Replicas
func (a *AdvancedStatefulSetAdapter) GetSpecPartition(obj metav1.Object, pods []*corev1.Pod) *int32 {
set := obj.(*v1beta1.StatefulSet)

Check warning on line 74 in pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go#L73-L74

Added lines #L73 - L74 were not covered by tests
if set.Spec.UpdateStrategy.Type == appsv1.OnDeleteStatefulSetStrategyType {
revision := getRevision(&set.ObjectMeta)
specPartition = getCurrentPartition(pods, revision)
return getCurrentPartition(pods, revision)

Check warning on line 77 in pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go#L77

Added line #L77 was not covered by tests
} else if set.Spec.UpdateStrategy.RollingUpdate != nil &&
set.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
specPartition = set.Spec.UpdateStrategy.RollingUpdate.Partition
return set.Spec.UpdateStrategy.RollingUpdate.Partition

Check warning on line 80 in pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go#L80

Added line #L80 was not covered by tests
}
return nil

Check warning on line 82 in pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go#L82

Added line #L82 was not covered by tests
}

statusReplicas = set.Status.Replicas
statusReadyReplicas = set.Status.ReadyReplicas
statusUpdatedReplicas, statusUpdatedReadyReplicas = calculateUpdatedReplicas(pods, updatedRevision)
func (a *AdvancedStatefulSetAdapter) GetStatusReplicas(obj metav1.Object) int32 {
return obj.(*v1beta1.StatefulSet).Status.Replicas

Check warning on line 86 in pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go#L85-L86

Added lines #L85 - L86 were not covered by tests
}

return
func (a *AdvancedStatefulSetAdapter) GetStatusReadyReplicas(obj metav1.Object) int32 {
return obj.(*v1beta1.StatefulSet).Status.ReadyReplicas

Check warning on line 90 in pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go#L89-L90

Added lines #L89 - L90 were not covered by tests
}

// GetSubsetFailure returns the failure information of the subset.
Expand Down
33 changes: 17 additions & 16 deletions pkg/controller/uniteddeployment/adapter/cloneset_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,29 @@ func (a *CloneSetAdapter) GetStatusObservedGeneration(obj metav1.Object) int64 {
return obj.(*alpha1.CloneSet).Status.ObservedGeneration
}

func (a *CloneSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) {

set := obj.(*alpha1.CloneSet)

pods, err = a.getCloneSetPods(set)

if err != nil {
return
}
func (a *CloneSetAdapter) GetSubsetPods(obj metav1.Object) ([]*corev1.Pod, error) {
return a.getCloneSetPods(obj.(*alpha1.CloneSet))

Check warning on line 45 in pkg/controller/uniteddeployment/adapter/cloneset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/cloneset_adapter.go#L44-L45

Added lines #L44 - L45 were not covered by tests
}

specReplicas = set.Spec.Replicas
func (a *CloneSetAdapter) GetSpecReplicas(obj metav1.Object) *int32 {
return obj.(*alpha1.CloneSet).Spec.Replicas

Check warning on line 49 in pkg/controller/uniteddeployment/adapter/cloneset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/cloneset_adapter.go#L48-L49

Added lines #L48 - L49 were not covered by tests
}

func (a *CloneSetAdapter) GetSpecPartition(obj metav1.Object, _ []*corev1.Pod) *int32 {
set := obj.(*alpha1.CloneSet)

Check warning on line 53 in pkg/controller/uniteddeployment/adapter/cloneset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/cloneset_adapter.go#L52-L53

Added lines #L52 - L53 were not covered by tests
if set.Spec.UpdateStrategy.Partition != nil {
partition, _ := intstr.GetValueFromIntOrPercent(set.Spec.UpdateStrategy.Partition, int(*set.Spec.Replicas), true)
specPartition = utilpointer.Int32Ptr(int32(partition))
partition, _ := intstr.GetScaledValueFromIntOrPercent(set.Spec.UpdateStrategy.Partition, int(*set.Spec.Replicas), true)
return utilpointer.Int32Ptr(int32(partition))

Check warning on line 56 in pkg/controller/uniteddeployment/adapter/cloneset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/cloneset_adapter.go#L55-L56

Added lines #L55 - L56 were not covered by tests
}
return nil

Check warning on line 58 in pkg/controller/uniteddeployment/adapter/cloneset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/cloneset_adapter.go#L58

Added line #L58 was not covered by tests
}

statusReplicas = set.Status.Replicas
statusReadyReplicas = set.Status.ReadyReplicas
statusUpdatedReplicas, statusUpdatedReadyReplicas = calculateUpdatedReplicas(pods, updatedRevision)
func (a *CloneSetAdapter) GetStatusReplicas(obj metav1.Object) int32 {
return obj.(*alpha1.CloneSet).Status.Replicas

Check warning on line 62 in pkg/controller/uniteddeployment/adapter/cloneset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/cloneset_adapter.go#L61-L62

Added lines #L61 - L62 were not covered by tests
}

return
func (a *CloneSetAdapter) GetStatusReadyReplicas(obj metav1.Object) int32 {
return obj.(*alpha1.CloneSet).Status.ReadyReplicas

Check warning on line 66 in pkg/controller/uniteddeployment/adapter/cloneset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/cloneset_adapter.go#L65-L66

Added lines #L65 - L66 were not covered by tests
}

func (a *CloneSetAdapter) GetSubsetFailure() *string {
Expand Down
32 changes: 18 additions & 14 deletions pkg/controller/uniteddeployment/adapter/deployment_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,28 @@ func (a *DeploymentAdapter) GetStatusObservedGeneration(obj metav1.Object) int64
return obj.(*appsv1.Deployment).Status.ObservedGeneration
}

// GetReplicaDetails returns the replicas detail the subset needs.
func (a *DeploymentAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) {
// Convert to Deployment Object
func (a *DeploymentAdapter) GetSubsetPods(obj metav1.Object) ([]*corev1.Pod, error) {

Check warning on line 60 in pkg/controller/uniteddeployment/adapter/deployment_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/deployment_adapter.go#L60

Added line #L60 was not covered by tests
set := obj.(*appsv1.Deployment)
return a.getDeploymentPods(set)

Check warning on line 62 in pkg/controller/uniteddeployment/adapter/deployment_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/deployment_adapter.go#L62

Added line #L62 was not covered by tests
}

// Get all pods belonging to deployment
pods, err = a.getDeploymentPods(set)
if err != nil {
return
}
func (a *DeploymentAdapter) GetSpecReplicas(obj metav1.Object) *int32 {
set := obj.(*appsv1.Deployment)
return set.Spec.Replicas

Check warning on line 67 in pkg/controller/uniteddeployment/adapter/deployment_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/deployment_adapter.go#L65-L67

Added lines #L65 - L67 were not covered by tests
}

// Set according replica counts
specReplicas = set.Spec.Replicas
statusReplicas = set.Status.Replicas
statusReadyReplicas = set.Status.ReadyReplicas
statusUpdatedReplicas, statusUpdatedReadyReplicas = calculateUpdatedReplicas(pods, updatedRevision)
func (a *DeploymentAdapter) GetSpecPartition(obj metav1.Object, pods []*corev1.Pod) *int32 {
return nil

Check warning on line 71 in pkg/controller/uniteddeployment/adapter/deployment_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/deployment_adapter.go#L70-L71

Added lines #L70 - L71 were not covered by tests
}

func (a *DeploymentAdapter) GetStatusReplicas(obj metav1.Object) int32 {
set := obj.(*appsv1.Deployment)
return set.Status.Replicas

Check warning on line 76 in pkg/controller/uniteddeployment/adapter/deployment_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/deployment_adapter.go#L74-L76

Added lines #L74 - L76 were not covered by tests
}

return
func (a *DeploymentAdapter) GetStatusReadyReplicas(obj metav1.Object) int32 {
set := obj.(*appsv1.Deployment)
return set.Status.ReadyReplicas

Check warning on line 81 in pkg/controller/uniteddeployment/adapter/deployment_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/deployment_adapter.go#L79-L81

Added lines #L79 - L81 were not covered by tests
}

// GetSubsetFailure returns the failure information of the subset.
Expand Down
48 changes: 18 additions & 30 deletions pkg/controller/uniteddeployment/adapter/statefulset_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,28 +58,32 @@ func (a *StatefulSetAdapter) GetStatusObservedGeneration(obj metav1.Object) int6
return obj.(*appsv1.StatefulSet).Status.ObservedGeneration
}

// GetReplicaDetails returns the replicas detail the subset needs.
func (a *StatefulSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) {
set := obj.(*appsv1.StatefulSet)
pods, err = a.getStatefulSetPods(set)
if err != nil {
return
}
func (a *StatefulSetAdapter) GetSubsetPods(obj metav1.Object) ([]*corev1.Pod, error) {
return a.getStatefulSetPods(obj.(*appsv1.StatefulSet))

Check warning on line 62 in pkg/controller/uniteddeployment/adapter/statefulset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/statefulset_adapter.go#L61-L62

Added lines #L61 - L62 were not covered by tests
}

specReplicas = set.Spec.Replicas
func (a *StatefulSetAdapter) GetSpecReplicas(obj metav1.Object) *int32 {
return obj.(*appsv1.StatefulSet).Spec.Replicas

Check warning on line 66 in pkg/controller/uniteddeployment/adapter/statefulset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/statefulset_adapter.go#L65-L66

Added lines #L65 - L66 were not covered by tests
}

func (a *StatefulSetAdapter) GetSpecPartition(obj metav1.Object, pods []*corev1.Pod) *int32 {
set := obj.(*appsv1.StatefulSet)

Check warning on line 70 in pkg/controller/uniteddeployment/adapter/statefulset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/statefulset_adapter.go#L69-L70

Added lines #L69 - L70 were not covered by tests
if set.Spec.UpdateStrategy.Type == appsv1.OnDeleteStatefulSetStrategyType {
revision := getRevision(&set.ObjectMeta)
specPartition = getCurrentPartition(pods, revision)
return getCurrentPartition(pods, revision)

Check warning on line 73 in pkg/controller/uniteddeployment/adapter/statefulset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/statefulset_adapter.go#L73

Added line #L73 was not covered by tests
} else if set.Spec.UpdateStrategy.RollingUpdate != nil &&
set.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
specPartition = set.Spec.UpdateStrategy.RollingUpdate.Partition
return set.Spec.UpdateStrategy.RollingUpdate.Partition

Check warning on line 76 in pkg/controller/uniteddeployment/adapter/statefulset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/statefulset_adapter.go#L76

Added line #L76 was not covered by tests
}
return nil

Check warning on line 78 in pkg/controller/uniteddeployment/adapter/statefulset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/statefulset_adapter.go#L78

Added line #L78 was not covered by tests
}

statusReplicas = set.Status.Replicas
statusReadyReplicas = set.Status.ReadyReplicas
statusUpdatedReplicas, statusUpdatedReadyReplicas = calculateUpdatedReplicas(pods, updatedRevision)
func (a *StatefulSetAdapter) GetStatusReplicas(obj metav1.Object) int32 {
return obj.(*appsv1.StatefulSet).Status.Replicas

Check warning on line 82 in pkg/controller/uniteddeployment/adapter/statefulset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/statefulset_adapter.go#L81-L82

Added lines #L81 - L82 were not covered by tests
}

return
func (a *StatefulSetAdapter) GetStatusReadyReplicas(obj metav1.Object) int32 {
return obj.(*appsv1.StatefulSet).Status.ReadyReplicas

Check warning on line 86 in pkg/controller/uniteddeployment/adapter/statefulset_adapter.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/adapter/statefulset_adapter.go#L85-L86

Added lines #L85 - L86 were not covered by tests
}

// GetSubsetFailure returns the failure information of the subset.
Expand Down Expand Up @@ -231,22 +235,6 @@ func (a *StatefulSetAdapter) getStatefulSetPods(set *appsv1.StatefulSet) ([]*cor
return claimedPods, nil
}

func calculateUpdatedReplicas(podList []*corev1.Pod, updatedRevision string) (updatedReplicas, updatedReadyReplicas int32) {
for _, pod := range podList {
revision := getRevision(&pod.ObjectMeta)

// Only count pods that are updated and are not terminating
if revision == updatedRevision && pod.GetDeletionTimestamp() == nil {
updatedReplicas++
if podutil.IsPodReady(pod) {
updatedReadyReplicas++
}
}
}

return
}

// deleteStuckPods tries to work around the blocking issue https://github.com/kubernetes/kubernetes/issues/67250
func (a *StatefulSetAdapter) deleteStuckPods(set *appsv1.StatefulSet, revision string, partition int32) error {
pods, err := a.getStatefulSetPods(set)
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/uniteddeployment/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func getNotPendingReplicasMap(nameToSubset *map[string]*Subset) map[string]int32
return result
}

func getSubSetUnschedulable(name string, nameToSubset *map[string]*Subset) (unschedulable bool) {
func isSubSetUnschedulable(name string, nameToSubset *map[string]*Subset) (unschedulable bool) {
if subsetObj, ok := (*nameToSubset)[name]; ok {
unschedulable = subsetObj.Status.UnschedulableStatus.Unschedulable
} else {
Expand Down Expand Up @@ -303,7 +303,7 @@ func (ac *elasticAllocator) validateAndCalculateMinMaxMap(replicas int32, nameTo
maxReplicas, _ = ParseSubsetReplicas(replicas, *subset.MaxReplicas)
}
if ac.Spec.Topology.ScheduleStrategy.IsAdaptive() {
unschedulable := getSubSetUnschedulable(subset.Name, nameToSubset)
unschedulable := isSubSetUnschedulable(subset.Name, nameToSubset)
// This means that in the Adaptive scheduling strategy, an unschedulable subset can only be scaled down, not scaled up.
if notPendingReplicas, ok := notPendingReplicasMap[subset.Name]; unschedulable && ok {
klog.InfoS("Assign min(notPendingReplicas, minReplicas/maxReplicas) for unschedulable subset",
Expand Down
33 changes: 15 additions & 18 deletions pkg/controller/uniteddeployment/subset_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,6 @@ func (m *SubsetControl) IsExpected(subSet *Subset, revision string) bool {
}

func (m *SubsetControl) convertToSubset(set metav1.Object, updatedRevision string) (*Subset, error) {
subSetName, err := getSubsetNameFrom(set)
if err != nil {
return nil, err
}

subset := &Subset{}
subset.ObjectMeta = metav1.ObjectMeta{
Name: set.GetName(),
Expand All @@ -154,30 +149,32 @@ func (m *SubsetControl) convertToSubset(set metav1.Object, updatedRevision strin
OwnerReferences: set.GetOwnerReferences(),
Finalizers: set.GetFinalizers(),
}
subset.Spec.SubsetName = subSetName

specReplicas, specPartition, statusReplicas,
statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas, pods, err := m.adapter.GetReplicaDetails(set, updatedRevision)
pods, err := m.adapter.GetSubsetPods(set)
if err != nil {
return nil, err

Check warning on line 155 in pkg/controller/uniteddeployment/subset_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/subset_control.go#L155

Added line #L155 was not covered by tests
}
subset.Spec.SubsetPods = pods

subSetName, err := getSubsetNameFrom(set)
if err != nil {
return subset, err
return nil, err

Check warning on line 161 in pkg/controller/uniteddeployment/subset_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/subset_control.go#L161

Added line #L161 was not covered by tests
}
subset.Spec.SubsetName = subSetName

if specReplicas != nil {
if specReplicas := m.adapter.GetSpecReplicas(set); specReplicas != nil {
subset.Spec.Replicas = *specReplicas
}

if specPartition != nil {
if specPartition := m.adapter.GetSpecPartition(set, pods); specPartition != nil {
subset.Spec.UpdateStrategy.Partition = *specPartition
}
subset.Spec.SubsetRef.Resources = append(subset.Spec.SubsetRef.Resources, set)

subset.Status.ObservedGeneration = m.adapter.GetStatusObservedGeneration(set)
subset.Status.Replicas = statusReplicas
subset.Status.ReadyReplicas = statusReadyReplicas
subset.Status.UpdatedReplicas = statusUpdatedReplicas
subset.Status.UpdatedReadyReplicas = statusUpdatedReadyReplicas

subset.Spec.SubsetRef.Resources = append(subset.Spec.SubsetRef.Resources, set)
subset.Spec.SubsetPods = pods
subset.Status.Replicas = m.adapter.GetStatusReplicas(set)
subset.Status.ReadyReplicas = m.adapter.GetStatusReadyReplicas(set)
subset.Status.UpdatedReplicas, subset.Status.UpdatedReadyReplicas = adapter.CalculateUpdatedReplicas(pods, updatedRevision)

return subset, nil
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/controller/uniteddeployment/uniteddeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,6 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci
}
return reconcile.Result{}, err
}
if instance.DeletionTimestamp == nil && !controllerutil.ContainsFinalizer(instance, UnitedDeploymentFinalizer) {
klog.InfoS("adding UnitedDeploymentFinalizer")
controllerutil.AddFinalizer(instance, UnitedDeploymentFinalizer)
if err = r.updateUnitedDeploymentInstance(instance); err != nil {
klog.ErrorS(err, "Failed to add UnitedDeploymentFinalizer", "unitedDeployment", request)
}
return reconcile.Result{}, err
}

if instance.DeletionTimestamp != nil {
if controllerutil.RemoveFinalizer(instance, UnitedDeploymentFinalizer) {
Expand All @@ -205,6 +197,14 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci
return reconcile.Result{}, err
}

if controllerutil.AddFinalizer(instance, UnitedDeploymentFinalizer) {
klog.InfoS("adding UnitedDeploymentFinalizer")
if err = r.updateUnitedDeploymentInstance(instance); err != nil {
klog.ErrorS(err, "Failed to add UnitedDeploymentFinalizer", "unitedDeployment", request)

Check warning on line 203 in pkg/controller/uniteddeployment/uniteddeployment_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/uniteddeployment/uniteddeployment_controller.go#L203

Added line #L203 was not covered by tests
}
return reconcile.Result{}, err
}

// make sure latest version is observed
ResourceVersionExpectation.Observe(instance)
if satisfied, _ := ResourceVersionExpectation.IsSatisfied(instance); !satisfied {
Expand Down Expand Up @@ -339,7 +339,7 @@ func manageUnschedulableStatusForExistingSubset(name string, subset *Subset, ud
// Maybe there exist some pending pods because the subset is unschedulable.
if subset.Status.ReadyReplicas < subset.Status.Replicas {
for _, pod := range subset.Spec.SubsetPods {
timeouted, checkAfter := utilcontroller.PodPendingTimeout(pod, ud.Spec.Topology.ScheduleStrategy.GetRescheduleCriticalDuration())
timeouted, checkAfter := utilcontroller.GetTimeBeforePendingTimeout(pod, ud.Spec.Topology.ScheduleStrategy.GetRescheduleCriticalDuration())
if timeouted {
subset.Status.UnschedulableStatus.PendingPods++
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/util/pod_condition_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ func UpdateMessageKvCondition(kv map[string]interface{}, condition *v1.PodCondit
condition.Message = string(message)
}

// PodPendingTimeout return true when Pod was scheduled failed and timeout.
// GetTimeBeforePendingTimeout return true when Pod was scheduled failed and timeout.
// nextCheckAfter > 0 means the pod is failed to schedule but not timeout yet.
func PodPendingTimeout(pod *v1.Pod, timeout time.Duration) (timeouted bool, nextCheckAfter time.Duration) {
func GetTimeBeforePendingTimeout(pod *v1.Pod, timeout time.Duration) (timeouted bool, nextCheckAfter time.Duration) {
if pod.DeletionTimestamp != nil || pod.Status.Phase != v1.PodPending || pod.Spec.NodeName != "" {
return false, -1
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/workloadspread/reschedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (r *ReconcileWorkloadSpread) deletePodsForSubset(ws *appsv1alpha1.WorkloadS

// PodUnscheduledTimeout return true when Pod was scheduled failed and timeout.
func PodUnscheduledTimeout(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod) bool {
timeouted, nextCheckAfter := util.PodPendingTimeout(pod, time.Second*time.Duration(*ws.Spec.ScheduleStrategy.Adaptive.RescheduleCriticalSeconds))
timeouted, nextCheckAfter := util.GetTimeBeforePendingTimeout(pod, time.Second*time.Duration(*ws.Spec.ScheduleStrategy.Adaptive.RescheduleCriticalSeconds))
if nextCheckAfter > 0 {
durationStore.Push(getWorkloadSpreadKey(ws), nextCheckAfter)

Check warning on line 117 in pkg/controller/workloadspread/reschedule.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workloadspread/reschedule.go#L117

Added line #L117 was not covered by tests
}
Expand Down

0 comments on commit 79686a4

Please sign in to comment.