Skip to content

Commit

Permalink
Refactor the events
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Aug 8, 2023
1 parent 9b6d89e commit 927ce7d
Show file tree
Hide file tree
Showing 11 changed files with 505 additions and 368 deletions.
84 changes: 54 additions & 30 deletions pkg/kwok/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
"strings"
"time"

coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -155,34 +156,24 @@ func (c *Controller) Start(ctx context.Context) error {
recorder := c.broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "kwok_controller"})

var (
err error
nodeLeases *NodeLeaseController
nodeLeasesCh chan Event[*coordinationv1.Lease]
getNodeOwnerFunc func(nodeName string) []metav1.OwnerReference
onLeaseNodeManageFunc func(nodeName string)
onNodeManagedFunc func(nodeName string)
readOnlyFunc func(nodeName string) bool
nodeSelectorFunc func(node *corev1.Node) bool
)
switch {
case conf.ManageAllNodes:
nodeSelectorFunc = func(node *corev1.Node) bool {
return true
}
case conf.ManageNodesWithAnnotationSelector != "":
selector, err := labels.Parse(conf.ManageNodesWithAnnotationSelector)

if conf.NodeLeaseDurationSeconds != 0 {
nodeLeasesCh = make(chan Event[*coordinationv1.Lease], 64)
nodeLeasesCli := conf.TypedClient.CoordinationV1().Leases(corev1.NamespaceNodeLease)
nodeLeasesInformer := NewInformer[*coordinationv1.Lease, *coordinationv1.LeaseList](nodeLeasesCli)
err = nodeLeasesInformer.Watch(ctx, Option{}, nodeLeasesCh)
if err != nil {
return err
return fmt.Errorf("failed to watch nodes: %w", err)
}
nodeSelectorFunc = func(node *corev1.Node) bool {
return selector.Matches(labels.Set(node.Annotations))
}
case conf.ManageNodesWithLabelSelector != "":
// client-go supports label filtering, so return true is ok.
nodeSelectorFunc = func(node *corev1.Node) bool {
return true
}
}

if conf.NodeLeaseDurationSeconds != 0 {
leaseDuration := time.Duration(conf.NodeLeaseDurationSeconds) * time.Second
// https://github.com/kubernetes/kubernetes/blob/02f4d643eae2e225591702e1bbf432efea453a26/pkg/kubelet/kubelet.go#L199-L200
renewInterval := leaseDuration / 4
Expand All @@ -195,7 +186,6 @@ func (c *Controller) Start(ctx context.Context) error {
LeaseParallelism: conf.NodeLeaseParallelism,
RenewInterval: renewInterval,
RenewIntervalJitter: renewIntervalJitter,
LeaseNamespace: corev1.NamespaceNodeLease,
MutateLeaseFunc: setNodeOwnerFunc(func(nodeName string) []metav1.OwnerReference {
return getNodeOwnerFunc(nodeName)
}),
Expand Down Expand Up @@ -289,6 +279,27 @@ func (c *Controller) Start(ctx context.Context) error {
nodeLifecycleGetter = resources.NewStaticGetter(lifecycle)
}

nodeCh := make(chan Event[*corev1.Node], 64)
nodesCli := conf.TypedClient.CoreV1().Nodes()
nodesInformer := NewInformer[*corev1.Node, *corev1.NodeList](nodesCli)
err = nodesInformer.Watch(ctx, Option{
LabelSelector: conf.ManageNodesWithLabelSelector,
AnnotationSelector: conf.ManageNodesWithAnnotationSelector,
}, nodeCh)
if err != nil {
return fmt.Errorf("failed to watch nodes: %w", err)
}

podsCh := make(chan Event[*corev1.Pod], 64)
podsCli := conf.TypedClient.CoreV1().Pods(corev1.NamespaceAll)
podsInformer := NewInformer[*corev1.Pod, *corev1.PodList](podsCli)
err = podsInformer.Watch(ctx, Option{
FieldSelector: fields.OneTermNotEqualSelector("spec.nodeName", "").String(),
}, podsCh)
if err != nil {
return fmt.Errorf("failed to watch pods: %w", err)
}

nodes, err := NewNodeController(NodeControllerConfig{
Clock: conf.Clock,
TypedClient: conf.TypedClient,
Expand All @@ -297,8 +308,6 @@ func (c *Controller) Start(ctx context.Context) error {
NodePort: conf.NodePort,
DisregardStatusWithAnnotationSelector: conf.DisregardStatusWithAnnotationSelector,
DisregardStatusWithLabelSelector: conf.DisregardStatusWithLabelSelector,
ManageNodesWithLabelSelector: conf.ManageNodesWithLabelSelector,
NodeSelectorFunc: nodeSelectorFunc,
OnNodeManagedFunc: func(nodeName string) {
onNodeManagedFunc(nodeName)
},
Expand All @@ -323,7 +332,6 @@ func (c *Controller) Start(ctx context.Context) error {
DisregardStatusWithLabelSelector: conf.DisregardStatusWithLabelSelector,
Lifecycle: podLifecycleGetter,
PlayStageParallelism: conf.PodPlayStageParallelism,
Namespace: corev1.NamespaceAll,
NodeGetFunc: nodes.Get,
FuncMap: defaultFuncMap,
Recorder: recorder,
Expand All @@ -344,8 +352,19 @@ func (c *Controller) Start(ctx context.Context) error {
}
onLeaseNodeManageFunc = func(nodeName string) {
// Manage the node and play stage all pods on the node
nodes.Manage(nodeName)
pods.PlayStagePodsOnNode(nodeName)
err := nodesInformer.Update(ctx, Option{
FieldSelector: fields.OneTermEqualSelector("metadata.name", nodeName).String(),
}, nodeCh)
if err != nil {
logger.Error("failed to update node", err, "node", nodeName)
}

err = podsInformer.Update(ctx, Option{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", nodeName).String(),
}, podsCh)
if err != nil {
logger.Error("failed to update pods on node", err, "node", nodeName)
}
}

onNodeManagedFunc = func(nodeName string) {
Expand All @@ -355,22 +374,27 @@ func (c *Controller) Start(ctx context.Context) error {
} else {
onNodeManagedFunc = func(nodeName string) {
// Play stage all pods on the node
pods.PlayStagePodsOnNode(nodeName)
err := podsInformer.Update(ctx, Option{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", nodeName).String(),
}, podsCh)
if err != nil {
logger.Error("failed to update pods on node", err, "node", nodeName)
}
}
}

c.broadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{Interface: c.typedClient.CoreV1().Events("")})
if nodeLeases != nil {
err := nodeLeases.Start(ctx)
err := nodeLeases.Start(ctx, nodeLeasesCh)
if err != nil {
return fmt.Errorf("failed to start node leases controller: %w", err)
}
}
err = pods.Start(ctx)
err = pods.Start(ctx, podsCh)
if err != nil {
return fmt.Errorf("failed to start pods controller: %w", err)
}
err = nodes.Start(ctx)
err = nodes.Start(ctx, nodeCh)
if err != nil {
return fmt.Errorf("failed to start nodes controller: %w", err)
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/kwok/controllers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,16 @@ func TestController(t *testing.T) {
},
}

ctx := context.Background()
ctx = log.NewContext(ctx, log.NewLogger(os.Stderr, log.LevelDebug))
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
t.Cleanup(func() {
cancel()
time.Sleep(time.Second)
})

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
ctx = log.NewContext(ctx, log.NewLogger(os.Stderr, log.LevelDebug))
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
t.Cleanup(func() {
cancel()
time.Sleep(time.Second)
})

ctr, err := NewController(tt.conf)
if (err != nil) != tt.wantErr {
t.Fatalf("NewController() error = %v, wantErr %v", err, tt.wantErr)
Expand All @@ -180,7 +180,7 @@ func TestController(t *testing.T) {
}
}
return true, nil
}, wait.WithContinueOnError(5))
}, wait.WithContinueOnError(10))
if err != nil {
t.Fatal(err)
}
Expand Down
106 changes: 106 additions & 0 deletions pkg/kwok/controllers/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"context"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
)

// EventType defines the possible types of events.
type EventType string

// Event types.
const (
Added EventType = "ADDED"
Modified EventType = "MODIFIED"
Deleted EventType = "DELETED"
Sync EventType = "SYNC"
)

// Event represents a single event to a watched resource.
type Event[T runtime.Object] struct {
// Type is Added, Modified, Deleted, or Sync.
Type EventType

// Object is:
// * If Type is Added, Modified or Sync: the new state of the object.
// * If Type is Deleted: the state of the object immediately before deletion.
Object T
}

// ListWatcher is an interface for objects that know how to list and watch themselves.
type ListWatcher[L runtime.Object] interface {
// List returns an object containing a list of the resources matching the provided options.
List(ctx context.Context, opts metav1.ListOptions) (L, error)
// Watch returns an object that watches the resources matching the provided options.
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
}

// Option is used to filter events.
type Option struct {
LabelSelector string
FieldSelector string
AnnotationSelector string
annotationSelector labels.Selector
}

func (o Option) setup(opts *metav1.ListOptions) {
if o.LabelSelector != "" {
opts.LabelSelector = o.LabelSelector
}
if o.FieldSelector != "" {
opts.FieldSelector = o.FieldSelector
}
}

func (o Option) toListOptions() metav1.ListOptions {
opts := metav1.ListOptions{}
o.setup(&opts)
return opts
}

func (o Option) filter(obj any) (bool, error) {
if o.AnnotationSelector == "" {
return true, nil
}

if o.annotationSelector == nil {
var err error
o.annotationSelector, err = labels.Parse(o.AnnotationSelector)
if err != nil {
return false, err
}
}

accessor, err := meta.Accessor(obj)
if err != nil {
return false, err
}

annotations := accessor.GetAnnotations()
if len(annotations) == 0 {
return false, nil
}

return o.annotationSelector.Matches(labels.Set(annotations)), nil
}
Loading

0 comments on commit 927ce7d

Please sign in to comment.