Skip to content

Commit

Permalink
Refactor the events
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Aug 10, 2023
1 parent 95aa17a commit 37f5d2c
Show file tree
Hide file tree
Showing 17 changed files with 982 additions and 680 deletions.
4 changes: 3 additions & 1 deletion pkg/kwok/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,14 @@ func runE(ctx context.Context, flags *flagpole) error {
}
ctx = log.NewContext(ctx, logger.With("id", id))

enableMetrics := len(metrics) != 0 || slices.Contains(flags.Options.EnableCRDs, v1alpha1.MetricKind)
ctr, err := controllers.NewController(controllers.Config{
Clock: clock.RealClock{},
TypedClient: typedClient,
TypedKwokClient: typedKwokClient,
EnableCNI: flags.Options.EnableCNI,
EnableMetrics: len(metrics) != 0 || slices.Contains(flags.Options.EnableCRDs, v1alpha1.MetricKind),
EnableMetrics: enableMetrics,
EnablePodCache: enableMetrics,
ManageAllNodes: flags.Options.ManageAllNodes,
ManageNodesWithAnnotationSelector: flags.Options.ManageNodesWithAnnotationSelector,
ManageNodesWithLabelSelector: flags.Options.ManageNodesWithLabelSelector,
Expand Down
150 changes: 108 additions & 42 deletions pkg/kwok/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
"strings"
"time"

coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
Expand All @@ -43,6 +44,8 @@ import (
"sigs.k8s.io/kwok/pkg/consts"
"sigs.k8s.io/kwok/pkg/log"
"sigs.k8s.io/kwok/pkg/utils/gotpl"
"sigs.k8s.io/kwok/pkg/utils/informer"
"sigs.k8s.io/kwok/pkg/utils/queue"
"sigs.k8s.io/kwok/pkg/utils/slices"
)

Expand Down Expand Up @@ -86,6 +89,8 @@ var (
return consts.Version
},
}

nodeKind = corev1.SchemeGroupVersion.WithKind("Node")
)

// Controller is a fake kubelet implementation that can be used to test
Expand All @@ -96,6 +101,9 @@ type Controller struct {
nodeLeases *NodeLeaseController
broadcaster record.EventBroadcaster
typedClient kubernetes.Interface

nodeCacheGetter informer.Getter[*corev1.Node]
podCacheGetter informer.Getter[*corev1.Pod]
}

// Config is the configuration for the controller
Expand All @@ -121,6 +129,7 @@ type Config struct {
NodeLeaseParallelism uint
ID string
EnableMetrics bool
EnablePodCache bool
}

// NewController creates a new fake kubelet controller
Expand Down Expand Up @@ -155,47 +164,36 @@ func (c *Controller) Start(ctx context.Context) error {
recorder := c.broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "kwok_controller"})

var (
err error
nodeLeases *NodeLeaseController
nodeLeasesChan chan informer.Event[*coordinationv1.Lease]
getNodeOwnerFunc func(nodeName string) []metav1.OwnerReference
onLeaseNodeManageFunc func(nodeName string)
onNodeManagedFunc func(nodeName string)
readOnlyFunc func(nodeName string) bool
nodeSelectorFunc func(node *corev1.Node) bool
)
switch {
case conf.ManageAllNodes:
nodeSelectorFunc = func(node *corev1.Node) bool {
return true
}
case conf.ManageNodesWithAnnotationSelector != "":
selector, err := labels.Parse(conf.ManageNodesWithAnnotationSelector)

if conf.NodeLeaseDurationSeconds != 0 {
nodeLeasesChan = make(chan informer.Event[*coordinationv1.Lease], 1)
nodeLeasesCli := conf.TypedClient.CoordinationV1().Leases(corev1.NamespaceNodeLease)
nodeLeasesInformer := informer.NewInformer[*coordinationv1.Lease, *coordinationv1.LeaseList](nodeLeasesCli)
err = nodeLeasesInformer.Watch(ctx, informer.Option{}, nodeLeasesChan)
if err != nil {
return err
}
nodeSelectorFunc = func(node *corev1.Node) bool {
return selector.Matches(labels.Set(node.Annotations))
return fmt.Errorf("failed to watch nodes: %w", err)
}
case conf.ManageNodesWithLabelSelector != "":
// client-go supports label filtering, so return true is ok.
nodeSelectorFunc = func(node *corev1.Node) bool {
return true
}
}

if conf.NodeLeaseDurationSeconds != 0 {
leaseDuration := time.Duration(conf.NodeLeaseDurationSeconds) * time.Second
// https://github.com/kubernetes/kubernetes/blob/02f4d643eae2e225591702e1bbf432efea453a26/pkg/kubelet/kubelet.go#L199-L200
renewInterval := leaseDuration / 4
// https://github.com/kubernetes/component-helpers/blob/d17b6f1e84500ee7062a26f5327dc73cb3e9374a/apimachinery/lease/controller.go#L100
renewIntervalJitter := 0.04
l, err := NewNodeLeaseController(NodeLeaseControllerConfig{
nodeLeases, err = NewNodeLeaseController(NodeLeaseControllerConfig{
Clock: conf.Clock,
TypedClient: conf.TypedClient,
LeaseDurationSeconds: conf.NodeLeaseDurationSeconds,
LeaseParallelism: conf.NodeLeaseParallelism,
RenewInterval: renewInterval,
RenewIntervalJitter: renewIntervalJitter,
LeaseNamespace: corev1.NamespaceNodeLease,
MutateLeaseFunc: setNodeOwnerFunc(func(nodeName string) []metav1.OwnerReference {
return getNodeOwnerFunc(nodeName)
}),
Expand All @@ -207,7 +205,6 @@ func (c *Controller) Start(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to create node leases controller: %w", err)
}
nodeLeases = l

// Not holding the lease means the node is not managed
readOnlyFunc = func(nodeName string) bool {
Expand Down Expand Up @@ -289,16 +286,44 @@ func (c *Controller) Start(ctx context.Context) error {
nodeLifecycleGetter = resources.NewStaticGetter(lifecycle)
}

nodeChan := make(chan informer.Event[*corev1.Node], 1)
nodesCli := conf.TypedClient.CoreV1().Nodes()
nodesInformer := informer.NewInformer[*corev1.Node, *corev1.NodeList](nodesCli)
nodesCache, err := nodesInformer.WatchWithCache(ctx, informer.Option{
LabelSelector: conf.ManageNodesWithLabelSelector,
AnnotationSelector: conf.ManageNodesWithAnnotationSelector,
}, nodeChan)
if err != nil {
return fmt.Errorf("failed to watch nodes: %w", err)
}

podsChan := make(chan informer.Event[*corev1.Pod], 1)
podsCli := conf.TypedClient.CoreV1().Pods(corev1.NamespaceAll)
podsInformer := informer.NewInformer[*corev1.Pod, *corev1.PodList](podsCli)

podWatchOption := informer.Option{
FieldSelector: fields.OneTermNotEqualSelector("spec.nodeName", "").String(),
}

var podsCache informer.Getter[*corev1.Pod]
if conf.EnablePodCache {
podsCache, err = podsInformer.WatchWithCache(ctx, podWatchOption, podsChan)
} else {
err = podsInformer.Watch(ctx, podWatchOption, podsChan)
}
if err != nil {
return fmt.Errorf("failed to watch pods: %w", err)
}

nodes, err := NewNodeController(NodeControllerConfig{
Clock: conf.Clock,
TypedClient: conf.TypedClient,
NodeCacheGetter: nodesCache,
NodeIP: conf.NodeIP,
NodeName: conf.NodeName,
NodePort: conf.NodePort,
DisregardStatusWithAnnotationSelector: conf.DisregardStatusWithAnnotationSelector,
DisregardStatusWithLabelSelector: conf.DisregardStatusWithLabelSelector,
ManageNodesWithLabelSelector: conf.ManageNodesWithLabelSelector,
NodeSelectorFunc: nodeSelectorFunc,
OnNodeManagedFunc: func(nodeName string) {
onNodeManagedFunc(nodeName)
},
Expand All @@ -317,13 +342,13 @@ func (c *Controller) Start(ctx context.Context) error {
Clock: conf.Clock,
EnableCNI: conf.EnableCNI,
TypedClient: conf.TypedClient,
NodeCacheGetter: nodesCache,
NodeIP: conf.NodeIP,
CIDR: conf.CIDR,
DisregardStatusWithAnnotationSelector: conf.DisregardStatusWithAnnotationSelector,
DisregardStatusWithLabelSelector: conf.DisregardStatusWithLabelSelector,
Lifecycle: podLifecycleGetter,
PlayStageParallelism: conf.PodPlayStageParallelism,
Namespace: corev1.NamespaceAll,
NodeGetFunc: nodes.Get,
FuncMap: defaultFuncMap,
Recorder: recorder,
Expand All @@ -334,68 +359,109 @@ func (c *Controller) Start(ctx context.Context) error {
return fmt.Errorf("failed to create pods controller: %w", err)
}

podOnNodeManageQueue := queue.NewQueue[string]()
if nodeLeases != nil {
nodeManageQueue := queue.NewQueue[string]()
getNodeOwnerFunc = func(nodeName string) []metav1.OwnerReference {
nodeInfo, ok := nodes.Get(nodeName)
if !ok || nodeInfo == nil {
node, ok := nodesCache.Get(nodeName)
if !ok {
return nil
}
return nodeInfo.OwnerReferences
ownerReferences := []metav1.OwnerReference{
{
APIVersion: nodeKind.Version,
Kind: nodeKind.Kind,
Name: node.Name,
UID: node.UID,
},
}
return ownerReferences
}
onLeaseNodeManageFunc = func(nodeName string) {
// Manage the node and play stage all pods on the node
nodes.Manage(nodeName)
pods.PlayStagePodsOnNode(nodeName)
nodeManageQueue.Add(nodeName)
podOnNodeManageQueue.Add(nodeName)
}

onNodeManagedFunc = func(nodeName string) {
// Try to hold the lease
nodeLeases.TryHold(nodeName)
}

go func() {
for {
nodeName := nodeManageQueue.GetOrWait()
err := nodesInformer.SyncOne(ctx, nodeName, nodeChan)
if err != nil {
logger.Error("failed to update node", err, "node", nodeName)
}
}
}()
} else {
onNodeManagedFunc = func(nodeName string) {
// Play stage all pods on the node
pods.PlayStagePodsOnNode(nodeName)
podOnNodeManageQueue.Add(nodeName)
}
}

go func() {
for {
nodeName := podOnNodeManageQueue.GetOrWait()
err = podsInformer.Sync(ctx, informer.Option{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", nodeName).String(),
}, podsChan)
if err != nil {
logger.Error("failed to update pods on node", err, "node", nodeName)
}
}
}()

c.broadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{Interface: c.typedClient.CoreV1().Events("")})
if nodeLeases != nil {
err := nodeLeases.Start(ctx)
err := nodeLeases.Start(ctx, nodeLeasesChan)
if err != nil {
return fmt.Errorf("failed to start node leases controller: %w", err)
}
}
err = pods.Start(ctx)
err = pods.Start(ctx, podsChan)
if err != nil {
return fmt.Errorf("failed to start pods controller: %w", err)
}
err = nodes.Start(ctx)
err = nodes.Start(ctx, nodeChan)
if err != nil {
return fmt.Errorf("failed to start nodes controller: %w", err)
}

c.pods = pods
c.nodes = nodes
c.nodeLeases = nodeLeases
c.nodeCacheGetter = nodesCache
c.podCacheGetter = podsCache
return nil
}

// GetNode returns the node with the given name
func (c *Controller) GetNode(nodeName string) (*NodeInfo, bool) {
// GetNodeInfo returns the node info for the given node
func (c *Controller) GetNodeInfo(nodeName string) (*NodeInfo, bool) {
return c.nodes.Get(nodeName)
}

// ListNodes returns all nodes
func (c *Controller) ListNodes() []*NodeInfo {
func (c *Controller) ListNodes() []string {
return c.nodes.List()
}

// ListPods returns all pods on the given node
func (c *Controller) ListPods(nodeName string) ([]*PodInfo, bool) {
func (c *Controller) ListPods(nodeName string) ([]log.ObjectRef, bool) {
return c.pods.List(nodeName)
}

// GetPodCache returns the pod cache
func (c *Controller) GetPodCache() informer.Getter[*corev1.Pod] {
return c.podCacheGetter
}

// GetNodeCache returns the node cache
func (c *Controller) GetNodeCache() informer.Getter[*corev1.Node] {
return c.nodeCacheGetter
}

// Identity returns a unique identifier for this controller
func Identity() (string, error) {
hostname, err := os.Hostname()
Expand Down
18 changes: 9 additions & 9 deletions pkg/kwok/controllers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,16 @@ func TestController(t *testing.T) {
},
}

ctx := context.Background()
ctx = log.NewContext(ctx, log.NewLogger(os.Stderr, log.LevelDebug))
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
t.Cleanup(func() {
cancel()
time.Sleep(time.Second)
})

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
ctx = log.NewContext(ctx, log.NewLogger(os.Stderr, log.LevelDebug))
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
t.Cleanup(func() {
cancel()
time.Sleep(time.Second)
})

ctr, err := NewController(tt.conf)
if (err != nil) != tt.wantErr {
t.Fatalf("NewController() error = %v, wantErr %v", err, tt.wantErr)
Expand All @@ -180,7 +180,7 @@ func TestController(t *testing.T) {
}
}
return true, nil
}, wait.WithContinueOnError(5))
}, wait.WithContinueOnError(10))
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 37f5d2c

Please sign in to comment.