diff --git a/pkg/kwok/controllers/controller.go b/pkg/kwok/controllers/controller.go index 5970af4ab..2e118edbf 100644 --- a/pkg/kwok/controllers/controller.go +++ b/pkg/kwok/controllers/controller.go @@ -25,9 +25,10 @@ import ( "strings" "time" + coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -155,34 +156,24 @@ func (c *Controller) Start(ctx context.Context) error { recorder := c.broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "kwok_controller"}) var ( + err error nodeLeases *NodeLeaseController + nodeLeasesCh chan Event[*coordinationv1.Lease] getNodeOwnerFunc func(nodeName string) []metav1.OwnerReference onLeaseNodeManageFunc func(nodeName string) onNodeManagedFunc func(nodeName string) readOnlyFunc func(nodeName string) bool - nodeSelectorFunc func(node *corev1.Node) bool ) - switch { - case conf.ManageAllNodes: - nodeSelectorFunc = func(node *corev1.Node) bool { - return true - } - case conf.ManageNodesWithAnnotationSelector != "": - selector, err := labels.Parse(conf.ManageNodesWithAnnotationSelector) + + if conf.NodeLeaseDurationSeconds != 0 { + nodeLeasesCh = make(chan Event[*coordinationv1.Lease], 64) + nodeLeasesCli := conf.TypedClient.CoordinationV1().Leases(corev1.NamespaceNodeLease) + nodeLeasesInformer := NewInformer[*coordinationv1.Lease, *coordinationv1.LeaseList](nodeLeasesCli) + err = nodeLeasesInformer.Watch(ctx, Option{}, nodeLeasesCh) if err != nil { - return err + return fmt.Errorf("failed to watch nodes: %w", err) } - nodeSelectorFunc = func(node *corev1.Node) bool { - return selector.Matches(labels.Set(node.Annotations)) - } - case conf.ManageNodesWithLabelSelector != "": - // client-go supports label filtering, so return true is ok. - nodeSelectorFunc = func(node *corev1.Node) bool { - return true - } - } - if conf.NodeLeaseDurationSeconds != 0 { leaseDuration := time.Duration(conf.NodeLeaseDurationSeconds) * time.Second // https://github.com/kubernetes/kubernetes/blob/02f4d643eae2e225591702e1bbf432efea453a26/pkg/kubelet/kubelet.go#L199-L200 renewInterval := leaseDuration / 4 @@ -195,7 +186,6 @@ func (c *Controller) Start(ctx context.Context) error { LeaseParallelism: conf.NodeLeaseParallelism, RenewInterval: renewInterval, RenewIntervalJitter: renewIntervalJitter, - LeaseNamespace: corev1.NamespaceNodeLease, MutateLeaseFunc: setNodeOwnerFunc(func(nodeName string) []metav1.OwnerReference { return getNodeOwnerFunc(nodeName) }), @@ -289,6 +279,27 @@ func (c *Controller) Start(ctx context.Context) error { nodeLifecycleGetter = resources.NewStaticGetter(lifecycle) } + nodeCh := make(chan Event[*corev1.Node], 64) + nodesCli := conf.TypedClient.CoreV1().Nodes() + nodesInformer := NewInformer[*corev1.Node, *corev1.NodeList](nodesCli) + err = nodesInformer.Watch(ctx, Option{ + LabelSelector: conf.ManageNodesWithLabelSelector, + AnnotationSelector: conf.ManageNodesWithAnnotationSelector, + }, nodeCh) + if err != nil { + return fmt.Errorf("failed to watch nodes: %w", err) + } + + podsCh := make(chan Event[*corev1.Pod], 64) + podsCli := conf.TypedClient.CoreV1().Pods(corev1.NamespaceAll) + podsInformer := NewInformer[*corev1.Pod, *corev1.PodList](podsCli) + err = podsInformer.Watch(ctx, Option{ + FieldSelector: fields.OneTermNotEqualSelector("spec.nodeName", "").String(), + }, podsCh) + if err != nil { + return fmt.Errorf("failed to watch pods: %w", err) + } + nodes, err := NewNodeController(NodeControllerConfig{ Clock: conf.Clock, TypedClient: conf.TypedClient, @@ -297,8 +308,6 @@ func (c *Controller) Start(ctx context.Context) error { NodePort: conf.NodePort, DisregardStatusWithAnnotationSelector: conf.DisregardStatusWithAnnotationSelector, DisregardStatusWithLabelSelector: conf.DisregardStatusWithLabelSelector, - ManageNodesWithLabelSelector: conf.ManageNodesWithLabelSelector, - NodeSelectorFunc: nodeSelectorFunc, OnNodeManagedFunc: func(nodeName string) { onNodeManagedFunc(nodeName) }, @@ -323,7 +332,6 @@ func (c *Controller) Start(ctx context.Context) error { DisregardStatusWithLabelSelector: conf.DisregardStatusWithLabelSelector, Lifecycle: podLifecycleGetter, PlayStageParallelism: conf.PodPlayStageParallelism, - Namespace: corev1.NamespaceAll, NodeGetFunc: nodes.Get, FuncMap: defaultFuncMap, Recorder: recorder, @@ -344,8 +352,19 @@ func (c *Controller) Start(ctx context.Context) error { } onLeaseNodeManageFunc = func(nodeName string) { // Manage the node and play stage all pods on the node - nodes.Manage(nodeName) - pods.PlayStagePodsOnNode(nodeName) + err := nodesInformer.Update(ctx, Option{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", nodeName).String(), + }, nodeCh) + if err != nil { + logger.Error("failed to update node", err, "node", nodeName) + } + + err = podsInformer.Update(ctx, Option{ + FieldSelector: fields.OneTermEqualSelector("spec.nodeName", nodeName).String(), + }, podsCh) + if err != nil { + logger.Error("failed to update pods on node", err, "node", nodeName) + } } onNodeManagedFunc = func(nodeName string) { @@ -355,22 +374,27 @@ func (c *Controller) Start(ctx context.Context) error { } else { onNodeManagedFunc = func(nodeName string) { // Play stage all pods on the node - pods.PlayStagePodsOnNode(nodeName) + err := podsInformer.Update(ctx, Option{ + FieldSelector: fields.OneTermEqualSelector("spec.nodeName", nodeName).String(), + }, podsCh) + if err != nil { + logger.Error("failed to update pods on node", err, "node", nodeName) + } } } c.broadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{Interface: c.typedClient.CoreV1().Events("")}) if nodeLeases != nil { - err := nodeLeases.Start(ctx) + err := nodeLeases.Start(ctx, nodeLeasesCh) if err != nil { return fmt.Errorf("failed to start node leases controller: %w", err) } } - err = pods.Start(ctx) + err = pods.Start(ctx, podsCh) if err != nil { return fmt.Errorf("failed to start pods controller: %w", err) } - err = nodes.Start(ctx) + err = nodes.Start(ctx, nodeCh) if err != nil { return fmt.Errorf("failed to start nodes controller: %w", err) } diff --git a/pkg/kwok/controllers/controller_test.go b/pkg/kwok/controllers/controller_test.go index 441dc3121..b70982aa7 100644 --- a/pkg/kwok/controllers/controller_test.go +++ b/pkg/kwok/controllers/controller_test.go @@ -147,16 +147,16 @@ func TestController(t *testing.T) { }, } - ctx := context.Background() - ctx = log.NewContext(ctx, log.NewLogger(os.Stderr, log.LevelDebug)) - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - t.Cleanup(func() { - cancel() - time.Sleep(time.Second) - }) - for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + ctx = log.NewContext(ctx, log.NewLogger(os.Stderr, log.LevelDebug)) + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + t.Cleanup(func() { + cancel() + time.Sleep(time.Second) + }) + ctr, err := NewController(tt.conf) if (err != nil) != tt.wantErr { t.Fatalf("NewController() error = %v, wantErr %v", err, tt.wantErr) @@ -180,7 +180,7 @@ func TestController(t *testing.T) { } } return true, nil - }, wait.WithContinueOnError(5)) + }, wait.WithContinueOnError(10)) if err != nil { t.Fatal(err) } diff --git a/pkg/kwok/controllers/event.go b/pkg/kwok/controllers/event.go new file mode 100644 index 000000000..31ed82367 --- /dev/null +++ b/pkg/kwok/controllers/event.go @@ -0,0 +1,106 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" +) + +// EventType defines the possible types of events. +type EventType string + +// Event types. +const ( + Added EventType = "ADDED" + Modified EventType = "MODIFIED" + Deleted EventType = "DELETED" + Sync EventType = "SYNC" +) + +// Event represents a single event to a watched resource. +type Event[T runtime.Object] struct { + // Type is Added, Modified, Deleted, or Sync. + Type EventType + + // Object is: + // * If Type is Added, Modified or Sync: the new state of the object. + // * If Type is Deleted: the state of the object immediately before deletion. + Object T +} + +// ListWatcher is an interface for objects that know how to list and watch themselves. +type ListWatcher[L runtime.Object] interface { + // List returns an object containing a list of the resources matching the provided options. + List(ctx context.Context, opts metav1.ListOptions) (L, error) + // Watch returns an object that watches the resources matching the provided options. + Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) +} + +// Option is used to filter events. +type Option struct { + LabelSelector string + FieldSelector string + AnnotationSelector string + annotationSelector labels.Selector +} + +func (o Option) setup(opts *metav1.ListOptions) { + if o.LabelSelector != "" { + opts.LabelSelector = o.LabelSelector + } + if o.FieldSelector != "" { + opts.FieldSelector = o.FieldSelector + } +} + +func (o Option) toListOptions() metav1.ListOptions { + opts := metav1.ListOptions{} + o.setup(&opts) + return opts +} + +func (o Option) filter(obj any) (bool, error) { + if o.AnnotationSelector == "" { + return true, nil + } + + if o.annotationSelector == nil { + var err error + o.annotationSelector, err = labels.Parse(o.AnnotationSelector) + if err != nil { + return false, err + } + } + + accessor, err := meta.Accessor(obj) + if err != nil { + return false, err + } + + annotations := accessor.GetAnnotations() + if len(annotations) == 0 { + return false, nil + } + + return o.annotationSelector.Matches(labels.Set(annotations)), nil +} diff --git a/pkg/kwok/controllers/event_informer.go b/pkg/kwok/controllers/event_informer.go new file mode 100644 index 000000000..41bb298e6 --- /dev/null +++ b/pkg/kwok/controllers/event_informer.go @@ -0,0 +1,240 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/pager" + + "sigs.k8s.io/kwok/pkg/log" +) + +// Informer is a wrapper around a ListFunc and WatchFunc that +type Informer[T runtime.Object, L runtime.Object] struct { + ListFunc func(ctx context.Context, opts metav1.ListOptions) (L, error) + WatchFunc func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) +} + +// NewInformer returns a new Informer. +func NewInformer[T runtime.Object, L runtime.Object](lw ListWatcher[L]) *Informer[T, L] { + return &Informer[T, L]{ + ListFunc: lw.List, + WatchFunc: lw.Watch, + } +} + +// Watch starts a goroutine that watches the resource and sends events to the events channel. +func (i *Informer[T, L]) Watch(ctx context.Context, opt Option, events chan<- Event[T]) error { + var t T + informer := cache.NewReflectorWithOptions( + &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + opt.setup(&opts) + return i.ListFunc(ctx, opts) + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + opt.setup(&opts) + return i.WatchFunc(ctx, opts) + }, + }, + t, + dummyCache(events, opt), + cache.ReflectorOptions{}, + ) + + go informer.Run(ctx.Done()) + return nil +} + +func (i *Informer[T, L]) WatchWithCache(ctx context.Context, opt Option, events chan<- Event[T]) (Getter[T], error) { + var t T + logger := log.FromContext(ctx) + store, contrtoller := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + opt.setup(&opts) + return i.ListFunc(ctx, opts) + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + opt.setup(&opts) + return i.WatchFunc(ctx, opts) + }, + }, + t, + 0, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + if ok, err := opt.filter(obj); err != nil { + logger.Error("filtering object", err) + return + } else if !ok { + return + } + events <- Event[T]{Type: Added, Object: obj.(T)} + }, + UpdateFunc: func(oldObj, newObj any) { + if ok, err := opt.filter(newObj); err != nil { + logger.Error("filtering object", err) + return + } else if !ok { + return + } + events <- Event[T]{Type: Modified, Object: newObj.(T)} + }, + DeleteFunc: func(obj any) { + if ok, err := opt.filter(obj); err != nil { + logger.Error("filtering object", err) + return + } else if !ok { + return + } + events <- Event[T]{Type: Deleted, Object: obj.(T)} + }, + }, + ) + + go contrtoller.Run(ctx.Done()) + + return &getter[T]{Store: store}, nil +} + +// Update sends a sync event for each resource returned by the ListFunc. +func (i *Informer[T, L]) Update(ctx context.Context, opt Option, events chan<- Event[T]) error { + listPager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return i.ListFunc(ctx, opts) + }) + + err := listPager.EachListItem(ctx, opt.toListOptions(), func(obj runtime.Object) error { + if ok, err := opt.filter(obj); err != nil { + return err + } else if !ok { + return nil + } + events <- Event[T]{Type: Sync, Object: obj.(T)} + return nil + }) + if err != nil { + return err + } + return nil +} + +func dummyCache[T runtime.Object](ch chan<- Event[T], opt Option) cache.Store { + return &cache.FakeCustomStore{ + AddFunc: func(obj any) error { + if ok, err := opt.filter(obj); err != nil { + return err + } else if !ok { + return nil + } + ch <- Event[T]{Type: Added, Object: obj.(T)} + return nil + }, + UpdateFunc: func(obj any) error { + if ok, err := opt.filter(obj); err != nil { + return err + } else if !ok { + return nil + } + ch <- Event[T]{Type: Modified, Object: obj.(T)} + return nil + }, + DeleteFunc: func(obj any) error { + if ok, err := opt.filter(obj); err != nil { + return err + } else if !ok { + return nil + } + ch <- Event[T]{Type: Deleted, Object: obj.(T)} + return nil + }, + ReplaceFunc: func(list []any, resourceVersion string) error { + for _, obj := range list { + if ok, err := opt.filter(obj); err != nil { + return err + } else if !ok { + continue + } + ch <- Event[T]{Type: Sync, Object: obj.(T)} + } + return nil + }, + ListFunc: func() []any { + panic("do not call") + }, + ListKeysFunc: func() []string { + panic("do not call") + }, + GetFunc: func(obj any) (item any, exists bool, err error) { + panic("do not call") + }, + GetByKeyFunc: func(key string) (item any, exists bool, err error) { + panic("do not call") + }, + ResyncFunc: func() error { + return nil + }, + } +} + +// Getter is a wrapper around a cache.Store that provides Get and List methods. +type Getter[T runtime.Object] interface { + Get(name string) (T, error) + GetWithNamespace(name, namespace string) (T, error) + List() []T +} + +type getter[T runtime.Object] struct { + cache.Store +} + +var ErrNotFound = fmt.Errorf("not found") + +func (g *getter[T]) Get(name string) (t T, err error) { + obj, exists, err := g.GetByKey(name) + if err != nil { + return t, err + } + if !exists { + return t, ErrNotFound + } + return obj.(T), nil +} + +func (g *getter[T]) GetWithNamespace(name, namespace string) (t T, err error) { + obj, exists, err := g.GetByKey(namespace + "/" + name) + if err != nil { + return t, err + } + if !exists { + return t, ErrNotFound + } + return obj.(T), nil +} + +func (g *getter[T]) List() (list []T) { + for _, obj := range g.Store.List() { + list = append(list, obj.(T)) + } + return list +} diff --git a/pkg/kwok/controllers/node_controller.go b/pkg/kwok/controllers/node_controller.go index ffd40f842..09fb94874 100644 --- a/pkg/kwok/controllers/node_controller.go +++ b/pkg/kwok/controllers/node_controller.go @@ -23,19 +23,15 @@ import ( "fmt" "net" "sync/atomic" - "time" "github.com/wzshiming/cron" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/pager" "k8s.io/client-go/tools/record" "k8s.io/utils/clock" netutils "k8s.io/utils/net" @@ -97,8 +93,6 @@ type NodeController struct { nodePort int disregardStatusWithAnnotationSelector labels.Selector disregardStatusWithLabelSelector labels.Selector - manageNodesWithLabelSelector string - nodeSelectorFunc func(node *corev1.Node) bool onNodeManagedFunc func(nodeName string) nodesSets maps.SyncMap[string, *NodeInfo] renderer gotpl.Renderer @@ -110,7 +104,6 @@ type NodeController struct { delayJobs jobInfoMap recorder record.EventRecorder readOnlyFunc func(nodeName string) bool - triggerPreprocessChan chan string enableMetrics bool } @@ -118,11 +111,9 @@ type NodeController struct { type NodeControllerConfig struct { Clock clock.Clock TypedClient kubernetes.Interface - NodeSelectorFunc func(node *corev1.Node) bool OnNodeManagedFunc func(nodeName string) DisregardStatusWithAnnotationSelector string DisregardStatusWithLabelSelector string - ManageNodesWithLabelSelector string NodeIP string NodeName string NodePort int @@ -166,10 +157,8 @@ func NewNodeController(conf NodeControllerConfig) (*NodeController, error) { c := &NodeController{ clock: conf.Clock, typedClient: conf.TypedClient, - nodeSelectorFunc: conf.NodeSelectorFunc, disregardStatusWithAnnotationSelector: disregardStatusWithAnnotationSelector, disregardStatusWithLabelSelector: disregardStatusWithLabelSelector, - manageNodesWithLabelSelector: conf.ManageNodesWithLabelSelector, onNodeManagedFunc: conf.OnNodeManagedFunc, nodeIP: conf.NodeIP, nodeName: conf.NodeName, @@ -178,7 +167,6 @@ func NewNodeController(conf NodeControllerConfig) (*NodeController, error) { lifecycle: conf.Lifecycle, playStageParallelism: conf.PlayStageParallelism, preprocessChan: make(chan *corev1.Node), - triggerPreprocessChan: make(chan string, 64), playStageChan: make(chan resourceStageJob[*corev1.Node]), recorder: conf.Recorder, readOnlyFunc: conf.ReadOnlyFunc, @@ -201,35 +189,20 @@ func NewNodeController(conf NodeControllerConfig) (*NodeController, error) { // Start starts the fake nodes controller // if nodeSelectorFunc is not nil, it will use it to determine if the node should be managed -func (c *NodeController) Start(ctx context.Context) error { +func (c *NodeController) Start(ctx context.Context, events <-chan Event[*corev1.Node]) error { go c.preprocessWorker(ctx) - go c.triggerPreprocessWorker(ctx) for i := uint(0); i < c.playStageParallelism; i++ { go c.playStageWorker(ctx) } - opt := metav1.ListOptions{ - LabelSelector: c.manageNodesWithLabelSelector, - } - err := c.watchResources(ctx, opt) + err := c.watchResources(ctx, events) if err != nil { return fmt.Errorf("failed watch nodes: %w", err) } - - logger := log.FromContext(ctx) - go func() { - err = c.listResources(ctx, opt) - if err != nil { - logger.Error("Failed list nodes", err) - } - }() return nil } func (c *NodeController) need(node *corev1.Node) bool { - if !c.nodeSelectorFunc(node) { - return false - } if c.disregardStatusWithAnnotationSelector != nil && len(node.Annotations) != 0 && c.disregardStatusWithAnnotationSelector.Matches(labels.Set(node.Annotations)) { @@ -245,40 +218,19 @@ func (c *NodeController) need(node *corev1.Node) bool { } // watchResources watch resources and send to preprocessChan -func (c *NodeController) watchResources(ctx context.Context, opt metav1.ListOptions) error { - // Watch nodes in the cluster - watcher, err := c.typedClient.CoreV1().Nodes().Watch(ctx, opt) - if err != nil { - return err - } - +func (c *NodeController) watchResources(ctx context.Context, events <-chan Event[*corev1.Node]) error { logger := log.FromContext(ctx) go func() { - rc := watcher.ResultChan() loop: for { select { - case event, ok := <-rc: + case event, ok := <-events: if !ok { - logger.Warn("Watch channel has been stopped, retrying") - for { - watcher, err := c.typedClient.CoreV1().Nodes().Watch(ctx, opt) - if err == nil { - rc = watcher.ResultChan() - continue loop - } - - logger.Error("Failed to watch nodes", err) - select { - case <-ctx.Done(): - break loop - case <-c.clock.After(time.Second * 5): - } - } + break loop } switch event.Type { - case watch.Added: - node := event.Object.(*corev1.Node) + case Added, Modified, Sync: + node := event.Object if c.need(node) { c.putNodeInfo(node) if c.onNodeManagedFunc != nil { @@ -294,22 +246,8 @@ func (c *NodeController) watchResources(ctx context.Context, opt metav1.ListOpti c.preprocessChan <- node } } - case watch.Modified: - node := event.Object.(*corev1.Node) - if c.need(node) { - c.putNodeInfo(node) - if c.readOnly(node.Name) { - logger.Debug("Skip node", - "reason", "read only", - "event", event.Type, - "node", node.Name, - ) - } else { - c.preprocessChan <- node - } - } - case watch.Deleted: - node := event.Object.(*corev1.Node) + case Deleted: + node := event.Object if _, has := c.nodesSets.Load(node.Name); has { c.deleteNodeInfo(node) @@ -322,7 +260,6 @@ func (c *NodeController) watchResources(ctx context.Context, opt metav1.ListOpti } } case <-ctx.Done(): - watcher.Stop() break loop } } @@ -331,31 +268,6 @@ func (c *NodeController) watchResources(ctx context.Context, opt metav1.ListOpti return nil } -// listResources lists all resources and sends to preprocessChan -func (c *NodeController) listResources(ctx context.Context, opt metav1.ListOptions) error { - listPager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - return c.typedClient.CoreV1().Nodes().List(ctx, opts) - }) - - logger := log.FromContext(ctx) - - return listPager.EachListItem(ctx, opt, func(obj runtime.Object) error { - node := obj.(*corev1.Node) - if c.need(node) { - c.putNodeInfo(node) - if c.readOnly(node.Name) { - logger.Debug("Skip node", - "node", node.Name, - "reason", "read only", - ) - } else { - c.preprocessChan <- node - } - } - return nil - }) -} - // finalizersModify modify finalizers of node func (c *NodeController) finalizersModify(ctx context.Context, node *corev1.Node, finalizers *internalversion.StageFinalizers) (*corev1.Node, error) { ops := finalizersModify(node.Finalizers, finalizers) @@ -427,34 +339,6 @@ func (c *NodeController) preprocessWorker(ctx context.Context) { } } -// triggerPreprocessWorker receives the resource from the triggerPreprocessChan and preprocess it -func (c *NodeController) triggerPreprocessWorker(ctx context.Context) { - logger := log.FromContext(ctx) - for { - select { - case <-ctx.Done(): - logger.Debug("Stop trigger preprocess worker") - return - case nodeName := <-c.triggerPreprocessChan: - nodeInfo, has := c.nodesSets.Load(nodeName) - if !has || nodeInfo.Node == nil { - logger.Warn("Node not found", - "node", nodeName, - ) - continue - } - if c.readOnly(nodeInfo.Node.Name) { - logger.Debug("Skip node", - "node", nodeInfo.Node.Name, - "reason", "read only", - ) - } else { - c.preprocessChan <- nodeInfo.Node - } - } - } -} - // preprocess the pod and send it to the playStageWorker func (c *NodeController) preprocess(ctx context.Context, node *corev1.Node) error { key := node.Name @@ -683,11 +567,6 @@ func (c *NodeController) deleteNodeInfo(node *corev1.Node) { c.nodesSets.Delete(node.Name) } -// Manage manages the node -func (c *NodeController) Manage(nodeName string) { - c.triggerPreprocessChan <- nodeName -} - // getNodeHostIPs returns the provided node's IP(s); either a single "primary IP" for the // node in a single-stack cluster, or a dual-stack pair of IPs in a dual-stack cluster // (for nodes that actually have dual-stack IPs). Among other things, the IPs returned diff --git a/pkg/kwok/controllers/node_controller_test.go b/pkg/kwok/controllers/node_controller_test.go index 8b86f785f..f210ca6f5 100644 --- a/pkg/kwok/controllers/node_controller_test.go +++ b/pkg/kwok/controllers/node_controller_test.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "os" - "strings" "testing" "time" @@ -42,6 +41,9 @@ func TestNodeController(t *testing.T) { &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "node0", + Annotations: map[string]string{ + "node": "true", + }, }, Status: corev1.NodeStatus{ Addresses: []corev1.NodeAddress{ @@ -69,7 +71,7 @@ func TestNodeController(t *testing.T) { ) nodeSelectorFunc := func(node *corev1.Node) bool { - return strings.HasPrefix(node.Name, "node") + return node.Annotations["node"] == "true" } nodeInit, _ := config.Unmarshal([]byte(nodefast.DefaultNodeInit)) @@ -79,7 +81,6 @@ func TestNodeController(t *testing.T) { nodes, err := NewNodeController(NodeControllerConfig{ TypedClient: clientset, NodeIP: "10.0.0.1", - NodeSelectorFunc: nodeSelectorFunc, Lifecycle: resources.NewStaticGetter(lifecycle), FuncMap: defaultFuncMap, PlayStageParallelism: 2, @@ -89,13 +90,23 @@ func TestNodeController(t *testing.T) { } ctx := context.Background() ctx = log.NewContext(ctx, log.NewLogger(os.Stderr, log.LevelDebug)) - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) t.Cleanup(func() { cancel() time.Sleep(time.Second) }) - err = nodes.Start(ctx) + nodeCh := make(chan Event[*corev1.Node], 1) + nodesCli := clientset.CoreV1().Nodes() + nodesInformer := NewInformer[*corev1.Node, *corev1.NodeList](nodesCli) + err = nodesInformer.Watch(ctx, Option{ + AnnotationSelector: "node=true", + }, nodeCh) + if err != nil { + t.Fatal(fmt.Errorf("failed to watch nodes: %w", err)) + } + + err = nodes.Start(ctx, nodeCh) if err != nil { t.Fatal(fmt.Errorf("failed to start nodes controller: %w", err)) } diff --git a/pkg/kwok/controllers/node_lease_controller.go b/pkg/kwok/controllers/node_lease_controller.go index f108ef532..ac5162802 100644 --- a/pkg/kwok/controllers/node_lease_controller.go +++ b/pkg/kwok/controllers/node_lease_controller.go @@ -23,12 +23,10 @@ import ( "github.com/wzshiming/cron" coordinationv1 "k8s.io/api/coordination/v1" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/pager" "k8s.io/utils/clock" "sigs.k8s.io/kwok/pkg/log" @@ -40,7 +38,6 @@ import ( // NodeLeaseController is responsible for creating and renewing a lease object type NodeLeaseController struct { typedClient clientset.Interface - leaseNamespace string leaseDurationSeconds uint leaseParallelism uint renewInterval time.Duration @@ -71,7 +68,6 @@ type NodeLeaseControllerConfig struct { LeaseParallelism uint RenewInterval time.Duration RenewIntervalJitter float64 - LeaseNamespace string MutateLeaseFunc func(*coordinationv1.Lease) error OnNodeManagedFunc func(nodeName string) } @@ -89,7 +85,6 @@ func NewNodeLeaseController(conf NodeLeaseControllerConfig) (*NodeLeaseControlle c := &NodeLeaseController{ clock: conf.Clock, typedClient: conf.TypedClient, - leaseNamespace: conf.LeaseNamespace, leaseDurationSeconds: conf.LeaseDurationSeconds, leaseParallelism: conf.LeaseParallelism, renewInterval: conf.RenewInterval, @@ -105,68 +100,39 @@ func NewNodeLeaseController(conf NodeLeaseControllerConfig) (*NodeLeaseControlle } // Start starts the NodeLeaseController -func (c *NodeLeaseController) Start(ctx context.Context) error { +func (c *NodeLeaseController) Start(ctx context.Context, events <-chan Event[*coordinationv1.Lease]) error { for i := uint(0); i < c.leaseParallelism; i++ { go c.syncWorker(ctx) } - opt := metav1.ListOptions{} - err := c.watchResources(ctx, opt) + err := c.watchResources(ctx, events) if err != nil { return fmt.Errorf("failed watch node leases: %w", err) } - logger := log.FromContext(ctx) - go func() { - err = c.listResources(ctx, opt) - if err != nil { - logger.Error("Failed list node leases", err) - } - }() return nil } // watchResources watch resources and send to preprocessChan -func (c *NodeLeaseController) watchResources(ctx context.Context, opt metav1.ListOptions) error { - // Watch node leases in the cluster - watcher, err := c.typedClient.CoordinationV1().Leases(c.leaseNamespace).Watch(ctx, opt) - if err != nil { - return err - } - +func (c *NodeLeaseController) watchResources(ctx context.Context, events <-chan Event[*coordinationv1.Lease]) error { logger := log.FromContext(ctx) go func() { - rc := watcher.ResultChan() loop: for { select { - case event, ok := <-rc: + case event, ok := <-events: if !ok { - for { - watcher, err := c.typedClient.CoordinationV1().Leases(c.leaseNamespace).Watch(ctx, opt) - if err == nil { - rc = watcher.ResultChan() - continue loop - } - - logger.Error("Failed to watch node leases", err) - select { - case <-ctx.Done(): - break loop - case <-c.clock.After(time.Second * 5): - } - } + break loop } switch event.Type { - case watch.Added, watch.Modified: - lease := event.Object.(*coordinationv1.Lease) + case Added, Modified, Sync: + lease := event.Object c.latestLease.Store(lease.Name, lease) - case watch.Deleted: - lease := event.Object.(*coordinationv1.Lease) + case Deleted: + lease := event.Object c.remove(lease.Name) } case <-ctx.Done(): - watcher.Stop() break loop } } @@ -175,19 +141,6 @@ func (c *NodeLeaseController) watchResources(ctx context.Context, opt metav1.Lis return nil } -// listResources lists all resources and sends to preprocessChan -func (c *NodeLeaseController) listResources(ctx context.Context, opt metav1.ListOptions) error { - listPager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - return c.typedClient.CoordinationV1().Leases(c.leaseNamespace).List(ctx, opts) - }) - - return listPager.EachListItem(ctx, opt, func(obj runtime.Object) error { - lease := obj.(*coordinationv1.Lease) - c.latestLease.Store(lease.Name, lease) - return nil - }) -} - func (c *NodeLeaseController) syncWorker(ctx context.Context) { logger := log.FromContext(ctx) for { @@ -325,7 +278,7 @@ func (c *NodeLeaseController) ensureLease(ctx context.Context, leaseName string) lease := &coordinationv1.Lease{ ObjectMeta: metav1.ObjectMeta{ Name: leaseName, - Namespace: c.leaseNamespace, + Namespace: corev1.NamespaceNodeLease, }, Spec: coordinationv1.LeaseSpec{ HolderIdentity: &c.holderIdentity, @@ -340,7 +293,7 @@ func (c *NodeLeaseController) ensureLease(ctx context.Context, leaseName string) } } - lease, err := c.typedClient.CoordinationV1().Leases(c.leaseNamespace).Create(ctx, lease, metav1.CreateOptions{}) + lease, err := c.typedClient.CoordinationV1().Leases(corev1.NamespaceNodeLease).Create(ctx, lease, metav1.CreateOptions{}) if err != nil { return nil, err } @@ -366,7 +319,7 @@ func (c *NodeLeaseController) renewLease(ctx context.Context, base *coordination } } - lease, err := c.typedClient.CoordinationV1().Leases(c.leaseNamespace).Update(ctx, lease, metav1.UpdateOptions{}) + lease, err := c.typedClient.CoordinationV1().Leases(lease.Namespace).Update(ctx, lease, metav1.UpdateOptions{}) if err != nil { return nil, false, err } diff --git a/pkg/kwok/controllers/node_lease_controller_test.go b/pkg/kwok/controllers/node_lease_controller_test.go index 47b2f9bff..a2a81e96f 100644 --- a/pkg/kwok/controllers/node_lease_controller_test.go +++ b/pkg/kwok/controllers/node_lease_controller_test.go @@ -77,7 +77,6 @@ func TestNodeLeaseController(t *testing.T) { LeaseParallelism: 2, RenewInterval: 10 * time.Second, RenewIntervalJitter: 0.04, - LeaseNamespace: corev1.NamespaceNodeLease, }) if err != nil { t.Fatal(fmt.Errorf("new node leases controller error: %w", err)) @@ -85,12 +84,21 @@ func TestNodeLeaseController(t *testing.T) { ctx := context.Background() ctx = log.NewContext(ctx, log.NewLogger(os.Stderr, log.LevelDebug)) - ctx, cancel := context.WithTimeout(ctx, 20*time.Second) + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) t.Cleanup(func() { cancel() time.Sleep(time.Second) }) - err = nodeLeases.Start(ctx) + + nodeLeasesCh := make(chan Event[*coordinationv1.Lease], 1) + nodeLeasesCli := clientset.CoordinationV1().Leases(corev1.NamespaceNodeLease) + nodesInformer := NewInformer[*coordinationv1.Lease, *coordinationv1.LeaseList](nodeLeasesCli) + err = nodesInformer.Watch(ctx, Option{}, nodeLeasesCh) + if err != nil { + t.Fatal(fmt.Errorf("watch node leases error: %w", err)) + } + + err = nodeLeases.Start(ctx, nodeLeasesCh) if err != nil { t.Fatal(fmt.Errorf("start node leases controller error: %w", err)) } diff --git a/pkg/kwok/controllers/pod_controller.go b/pkg/kwok/controllers/pod_controller.go index 18041bfd0..8ae6ea6a5 100644 --- a/pkg/kwok/controllers/pod_controller.go +++ b/pkg/kwok/controllers/pod_controller.go @@ -21,20 +21,15 @@ import ( "context" "encoding/json" "fmt" - "time" "github.com/wzshiming/cron" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/pager" "k8s.io/client-go/tools/record" "k8s.io/utils/clock" @@ -48,8 +43,7 @@ import ( ) var ( - deleteOpt = *metav1.NewDeleteOptions(0) - podFieldSelector = fields.OneTermNotEqualSelector("spec.nodeName", "").String() + deleteOpt = *metav1.NewDeleteOptions(0) ) // PodController is a fake pods implementation that can be used to test @@ -61,7 +55,6 @@ type PodController struct { disregardStatusWithLabelSelector labels.Selector nodeIP string defaultCIDR string - namespace string nodeGetFunc func(nodeName string) (*NodeInfo, bool) ipPools maps.SyncMap[string, *ipPool] renderer gotpl.Renderer @@ -75,7 +68,6 @@ type PodController struct { delayJobs jobInfoMap recorder record.EventRecorder readOnlyFunc func(nodeName string) bool - triggerPreprocessChan chan string enableMetrics bool } @@ -93,7 +85,6 @@ type PodControllerConfig struct { DisregardStatusWithLabelSelector string NodeIP string CIDR string - Namespace string NodeGetFunc func(nodeName string) (*NodeInfo, bool) NodeHasMetric func(nodeName string) bool Lifecycle resources.Getter[Lifecycle] @@ -132,13 +123,11 @@ func NewPodController(conf PodControllerConfig) (*PodController, error) { disregardStatusWithLabelSelector: disregardStatusWithLabelSelector, nodeIP: conf.NodeIP, defaultCIDR: conf.CIDR, - namespace: conf.Namespace, nodeGetFunc: conf.NodeGetFunc, cronjob: cron.NewCron(), lifecycle: conf.Lifecycle, playStageParallelism: conf.PlayStageParallelism, preprocessChan: make(chan *corev1.Pod), - triggerPreprocessChan: make(chan string, 64), playStageChan: make(chan resourceStageJob[*corev1.Pod]), recorder: conf.Recorder, readOnlyFunc: conf.ReadOnlyFunc, @@ -159,28 +148,17 @@ func NewPodController(conf PodControllerConfig) (*PodController, error) { // Start starts the fake pod controller // It will modify the pods status to we want -func (c *PodController) Start(ctx context.Context) error { +func (c *PodController) Start(ctx context.Context, events <-chan Event[*corev1.Pod]) error { go c.preprocessWorker(ctx) - go c.triggerPreprocessWorker(ctx) for i := uint(0); i < c.playStageParallelism; i++ { go c.playStageWorker(ctx) } - opt := metav1.ListOptions{ - FieldSelector: podFieldSelector, - } - err := c.watchResources(ctx, opt) + err := c.watchResources(ctx, events) if err != nil { return fmt.Errorf("failed watch pods: %w", err) } - logger := log.FromContext(ctx) - go func() { - err = c.listResources(ctx, opt) - if err != nil { - logger.Error("Failed list pods", err) - } - }() return nil } @@ -258,27 +236,6 @@ func (c *PodController) preprocessWorker(ctx context.Context) { } } -// triggerPreprocessWorker receives the resource from the triggerPreprocessChan and preprocess it -func (c *PodController) triggerPreprocessWorker(ctx context.Context) { - logger := log.FromContext(ctx) - for { - select { - case <-ctx.Done(): - logger.Debug("Stop trigger preprocess worker") - return - case nodeName := <-c.triggerPreprocessChan: - err := c.listResources(ctx, metav1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("spec.nodeName", nodeName).String(), - }) - if err != nil { - logger.Error("Failed to preprocess node", err, - "node", nodeName, - ) - } - } - } -} - // preprocess the pod and send it to the playStageWorker func (c *PodController) preprocess(ctx context.Context, pod *corev1.Pod) error { key := log.KObj(pod).String() @@ -462,39 +419,21 @@ func (c *PodController) need(pod *corev1.Pod) bool { } // watchResources watch resources and send to preprocessChan -func (c *PodController) watchResources(ctx context.Context, opt metav1.ListOptions) error { - watcher, err := c.typedClient.CoreV1().Pods(c.namespace).Watch(ctx, opt) - if err != nil { - return err - } +func (c *PodController) watchResources(ctx context.Context, events <-chan Event[*corev1.Pod]) error { logger := log.FromContext(ctx) go func() { - rc := watcher.ResultChan() loop: for { select { - case event, ok := <-rc: + case event, ok := <-events: if !ok { - for { - watcher, err := c.typedClient.CoreV1().Pods(c.namespace).Watch(ctx, opt) - if err == nil { - rc = watcher.ResultChan() - continue loop - } - - logger.Error("Failed to watch pods", err) - select { - case <-ctx.Done(): - break loop - case <-c.clock.After(time.Second * 5): - } - } + break loop } switch event.Type { - case watch.Added, watch.Modified: - pod := event.Object.(*corev1.Pod) + case Added, Modified, Sync: + pod := event.Object if c.enableMetrics { c.putPodInfo(pod) } @@ -519,15 +458,15 @@ func (c *PodController) watchResources(ctx context.Context, opt metav1.ListOptio } if c.enableMetrics && - event.Type == watch.Added && + event.Type == Added && c.nodeGetFunc != nil { nodeInfo, ok := c.nodeGetFunc(pod.Spec.NodeName) if ok { nodeInfo.StartedContainer.Add(int64(len(pod.Spec.Containers))) } } - case watch.Deleted: - pod := event.Object.(*corev1.Pod) + case Deleted: + pod := event.Object if c.enableMetrics { c.deletePodInfo(pod) } @@ -544,7 +483,6 @@ func (c *PodController) watchResources(ctx context.Context, opt metav1.ListOptio } } case <-ctx.Done(): - watcher.Stop() break loop } } @@ -554,39 +492,6 @@ func (c *PodController) watchResources(ctx context.Context, opt metav1.ListOptio return nil } -// listResources lists all resources and sends to preprocessChan -func (c *PodController) listResources(ctx context.Context, opt metav1.ListOptions) error { - listPager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - return c.typedClient.CoreV1().Pods(c.namespace).List(ctx, opts) - }) - - logger := log.FromContext(ctx) - - return listPager.EachListItem(ctx, opt, func(obj runtime.Object) error { - pod := obj.(*corev1.Pod) - if c.enableMetrics { - c.putPodInfo(pod) - } - if c.need(pod) { - if c.readOnly(pod.Spec.NodeName) { - logger.Debug("Skip pod", - "pod", log.KObj(pod), - "node", pod.Spec.NodeName, - "reason", "read only", - ) - } else { - c.preprocessChan <- pod.DeepCopy() - } - } - return nil - }) -} - -// PlayStagePodsOnNode plays stage pods on node -func (c *PodController) PlayStagePodsOnNode(nodeName string) { - c.triggerPreprocessChan <- nodeName -} - // ipPool returns the ipPool for the given cidr func (c *PodController) ipPool(cidr string) (*ipPool, error) { pool, ok := c.ipPools.Load(cidr) diff --git a/pkg/kwok/controllers/pod_controller_test.go b/pkg/kwok/controllers/pod_controller_test.go index c0ccfceac..cb10c9293 100644 --- a/pkg/kwok/controllers/pod_controller_test.go +++ b/pkg/kwok/controllers/pod_controller_test.go @@ -26,6 +26,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes/fake" @@ -195,13 +196,23 @@ func TestPodController(t *testing.T) { ctx := context.Background() ctx = log.NewContext(ctx, log.NewLogger(os.Stderr, log.LevelDebug)) - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) t.Cleanup(func() { cancel() time.Sleep(time.Second) }) - err = pods.Start(ctx) + podsCh := make(chan Event[*corev1.Pod], 1) + podsCli := clientset.CoreV1().Pods(corev1.NamespaceAll) + podsInformer := NewInformer[*corev1.Pod, *corev1.PodList](podsCli) + err = podsInformer.Watch(ctx, Option{ + FieldSelector: fields.OneTermNotEqualSelector("spec.nodeName", "").String(), + }, podsCh) + if err != nil { + t.Fatal(fmt.Errorf("watch pods error: %w", err)) + } + + err = pods.Start(ctx, podsCh) if err != nil { t.Fatal(fmt.Errorf("start pods controller error: %w", err)) } @@ -254,27 +265,6 @@ func TestPodController(t *testing.T) { t.Fatal(fmt.Errorf("create pod1 error: %w", err)) } - pod1, err := clientset.CoreV1().Pods("default").Get(ctx, "pod1", metav1.GetOptions{}) - if err != nil { - t.Fatal(fmt.Errorf("get pod1 error: %w", err)) - } - pod1.Annotations = map[string]string{ - "fake": "custom", - } - pod1.Status.Reason = "custom" - _, err = clientset.CoreV1().Pods("default").Update(ctx, pod1, metav1.UpdateOptions{}) - if err != nil { - t.Fatal(fmt.Errorf("update pod1 error: %w", err)) - } - - pod1, err = clientset.CoreV1().Pods("default").Get(ctx, "pod1", metav1.GetOptions{}) - if err != nil { - t.Fatal(fmt.Errorf("get pod1 error: %w", err)) - } - if pod1.Status.Reason != "custom" { - t.Fatal(fmt.Errorf("pod1 status reason not custom")) - } - var list *corev1.PodList err = wait.Poll(ctx, func(ctx context.Context) (done bool, err error) { list, err = clientset.CoreV1().Pods("default").List(ctx, metav1.ListOptions{}) @@ -310,11 +300,32 @@ func TestPodController(t *testing.T) { } } return true, nil - }, wait.WithContinueOnError(5)) + }, wait.WithContinueOnError(10)) if err != nil { t.Fatal(err) } + pod1, err := clientset.CoreV1().Pods("default").Get(ctx, "pod1", metav1.GetOptions{}) + if err != nil { + t.Fatal(fmt.Errorf("get pod1 error: %w", err)) + } + pod1.Annotations = map[string]string{ + "fake": "custom", + } + pod1.Status.Reason = "custom" + _, err = clientset.CoreV1().Pods("default").Update(ctx, pod1, metav1.UpdateOptions{}) + if err != nil { + t.Fatal(fmt.Errorf("update pod1 error: %w", err)) + } + + pod1, err = clientset.CoreV1().Pods("default").Get(ctx, "pod1", metav1.GetOptions{}) + if err != nil { + t.Fatal(fmt.Errorf("get pod1 error: %w", err)) + } + if pod1.Status.Reason != "custom" { + t.Fatal(fmt.Errorf("pod1 status reason not custom")) + } + pod, ok := slices.Find(list.Items, func(pod corev1.Pod) bool { return pod.Name == "pod0" }) diff --git a/test/kwokctl/kwokctl_benchmark_test.sh b/test/kwokctl/kwokctl_benchmark_test.sh index a62dc8b3c..7f1801d4c 100755 --- a/test/kwokctl/kwokctl_benchmark_test.sh +++ b/test/kwokctl/kwokctl_benchmark_test.sh @@ -89,7 +89,7 @@ function main() { echo "Benchmarking on ${KWOK_RUNTIME}" name="benchmark-${KWOK_RUNTIME}" - create_cluster "${name}" "${release}" + create_cluster "${name}" "${release}" --disable-qps-limits child_timeout 120 scale_create_node "${name}" 1000 || failed+=("scale_create_node_timeout_${name}") child_timeout 120 scale_create_pod "${name}" 1000 || failed+=("scale_create_pod_timeout_${name}") child_timeout 120 scale_delete_pod "${name}" 0 || failed+=("scale_delete_pod_timeout_${name}")