Skip to content

Commit

Permalink
Periodic sync of node status
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Aug 15, 2023
1 parent 8ab0b37 commit f0b9c44
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 27 deletions.
1 change: 1 addition & 0 deletions kustomize/stage/fast/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- ../node/fast
- ../node/heartbeat-with-lease
- ../pod/fast
8 changes: 8 additions & 0 deletions kustomize/stage/node/heartbeat-with-lease/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Node Heartbeat Stage With Lease

This Stage configures the node heartbeat with lease.

Some controllers rely on node conditions, so we synchronize the node information periodically, just like Kubelet.

The `node-heartbeat-with-lease` Stage is applied to nodes that have the `Ready` condition set to `True` in their `status.conditions` field.
When applied, this Stage maintains the `status.conditions`, `status.addresses` and `status.daemonEndpoints` fields for the node.
28 changes: 28 additions & 0 deletions kustomize/stage/node/heartbeat-with-lease/embed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package heartbeat_with_lease contains the node heartbeat with lease for kwok.
package heartbeat_with_lease //nolint:revive

import (
_ "embed"
)

var (
// DefaultNodeHeartbeatWithLease is the default node heartbeat yaml.
//go:embed node-heartbeat-with-lease.yaml
DefaultNodeHeartbeatWithLease string
)
4 changes: 4 additions & 0 deletions kustomize/stage/node/heartbeat-with-lease/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- node-heartbeat-with-lease.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
apiVersion: kwok.x-k8s.io/v1alpha1
kind: Stage
metadata:
name: node-heartbeat-with-lease
spec:
resourceRef:
apiGroup: v1
kind: Node
selector:
matchExpressions:
- key: '.status.phase'
operator: 'In'
values:
- 'Running'
- key: '.status.conditions.[] | select( .type == "Ready" ) | .status'
operator: 'In'
values:
- 'True'
delay:
durationMilliseconds: 600000
jitterDurationMilliseconds: 610000
next:
statusTemplate: |
{{ $now := Now }}
{{ $lastTransitionTime := or .creationTimestamp $now }}
conditions:
{{ range NodeConditions }}
- lastHeartbeatTime: {{ $now | Quote }}
lastTransitionTime: {{ $lastTransitionTime | Quote }}
message: {{ .message | Quote }}
reason: {{ .reason | Quote }}
status: {{ .status | Quote }}
type: {{ .type | Quote }}
{{ end }}
addresses:
{{ with .status.addresses }}
{{ YAML . 1 }}
{{ else }}
{{ with NodeIP }}
- address: {{ . | Quote }}
type: InternalIP
{{ end }}
{{ with NodeName }}
- address: {{ . | Quote }}
type: Hostname
{{ end }}
{{ end }}
{{ with NodePort }}
daemonEndpoints:
kubeletEndpoint:
Port: {{ . }}
{{ end }}
26 changes: 15 additions & 11 deletions pkg/kwok/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

nodefast "sigs.k8s.io/kwok/kustomize/stage/node/fast"
nodeheartbeat "sigs.k8s.io/kwok/kustomize/stage/node/heartbeat"
nodeheartbeatwithlease "sigs.k8s.io/kwok/kustomize/stage/node/heartbeat-with-lease"
podfast "sigs.k8s.io/kwok/kustomize/stage/pod/fast"
"sigs.k8s.io/kwok/pkg/apis/internalversion"
"sigs.k8s.io/kwok/pkg/apis/v1alpha1"
Expand Down Expand Up @@ -395,7 +396,7 @@ func waitForReady(ctx context.Context, clientset kubernetes.Interface) error {
return nil
}

func getDefaultNodeStages(heartbeat bool) ([]*internalversion.Stage, error) {
func getDefaultNodeStages(lease bool) ([]*internalversion.Stage, error) {
nodeStages := []*internalversion.Stage{}
nodeInit, err := config.Unmarshal([]byte(nodefast.DefaultNodeInit))
if err != nil {
Expand All @@ -407,17 +408,20 @@ func getDefaultNodeStages(heartbeat bool) ([]*internalversion.Stage, error) {
}
nodeStages = append(nodeStages, nodeInitStage)

if heartbeat {
nodeHeartbeat, err := config.Unmarshal([]byte(nodeheartbeat.DefaultNodeHeartbeat))
if err != nil {
return nil, err
}
nodeHeartbeatStage, ok := nodeHeartbeat.(*internalversion.Stage)
if !ok {
return nil, fmt.Errorf("failed to convert node init to stage")
}
nodeStages = append(nodeStages, nodeHeartbeatStage)
rawHeartbeat := nodeheartbeat.DefaultNodeHeartbeat
if lease {
rawHeartbeat = nodeheartbeatwithlease.DefaultNodeHeartbeatWithLease
}

nodeHeartbeat, err := config.Unmarshal([]byte(rawHeartbeat))
if err != nil {
return nil, err
}
nodeHeartbeatStage, ok := nodeHeartbeat.(*internalversion.Stage)
if !ok {
return nil, fmt.Errorf("failed to convert node init to stage")
}
nodeStages = append(nodeStages, nodeHeartbeatStage)
return nodeStages, nil
}

Expand Down
44 changes: 28 additions & 16 deletions pkg/kwokctl/runtime/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import (

"sigs.k8s.io/kwok/kustomize/crd"
nodefast "sigs.k8s.io/kwok/kustomize/stage/node/fast"
heartbeat "sigs.k8s.io/kwok/kustomize/stage/node/heartbeat"
nodeheartbeat "sigs.k8s.io/kwok/kustomize/stage/node/heartbeat"
nodeheartbeatwithlease "sigs.k8s.io/kwok/kustomize/stage/node/heartbeat-with-lease"
"sigs.k8s.io/kwok/pkg/apis/internalversion"
"sigs.k8s.io/kwok/pkg/apis/v1alpha1"
"sigs.k8s.io/kwok/pkg/config"
Expand Down Expand Up @@ -193,7 +194,7 @@ func (c *Cluster) Save(ctx context.Context) error {
conf.Options.Runtime != consts.RuntimeTypeKind &&
conf.Options.Runtime != consts.RuntimeTypeKindPodman &&
len(config.FilterWithTypeFromContext[*internalversion.Stage](ctx)) == 0 {
defaultStages, err := c.getDefaultStages(conf.Options.NodeStatusUpdateFrequencyMilliseconds, conf.Options.NodeLeaseDurationSeconds == 0)
defaultStages, err := c.getDefaultStages(conf.Options.NodeStatusUpdateFrequencyMilliseconds, conf.Options.NodeLeaseDurationSeconds != 0)
if err != nil {
return err
}
Expand All @@ -203,7 +204,7 @@ func (c *Cluster) Save(ctx context.Context) error {
return config.Save(ctx, c.GetWorkdirPath(ConfigName), objs)
}

func (c *Cluster) getDefaultStages(updateFrequency int64, nodeHeartbeat bool) ([]config.InternalObject, error) {
func (c *Cluster) getDefaultStages(updateFrequency int64, lease bool) ([]config.InternalObject, error) {
objs := []config.InternalObject{}

nodeInit, err := config.Unmarshal([]byte(nodefast.DefaultNodeInit))
Expand All @@ -215,23 +216,34 @@ func (c *Cluster) getDefaultStages(updateFrequency int64, nodeHeartbeat bool) ([
}
objs = append(objs, nodeInit)

if nodeHeartbeat {
nodeHeartbeat, err := config.Unmarshal([]byte(heartbeat.DefaultNodeHeartbeat))
if err != nil {
return nil, err
}
rawHeartbeat := nodeheartbeat.DefaultNodeHeartbeat
if lease {
rawHeartbeat = nodeheartbeatwithlease.DefaultNodeHeartbeatWithLease
}

nodeHeartbeatStage, ok := nodeHeartbeat.(*internalversion.Stage)
if !ok {
return nil, fmt.Errorf("failed to get node heartbeat stage %T", nodeHeartbeat)
nodeHeartbeat, err := config.Unmarshal([]byte(rawHeartbeat))
if err != nil {
return nil, err
}

nodeHeartbeatStage, ok := nodeHeartbeat.(*internalversion.Stage)
if !ok {
return nil, fmt.Errorf("failed to get node nodeheartbeat stage %T", nodeHeartbeat)
}
if updateFrequency > 0 {
durationMilliseconds := format.ElemOrDefault(nodeHeartbeatStage.Spec.Delay.DurationMilliseconds)
jitterDurationMilliseconds := format.ElemOrDefault(nodeHeartbeatStage.Spec.Delay.JitterDurationMilliseconds)
if updateFrequency > durationMilliseconds {
durationMilliseconds = updateFrequency
}
if updateFrequency > 0 {
nodeHeartbeatStage.Spec.Delay.DurationMilliseconds = format.Ptr(updateFrequency)
nodeHeartbeatStage.Spec.Delay.JitterDurationMilliseconds = format.Ptr(updateFrequency + updateFrequency/10)
if jitterUpdateFrequency := updateFrequency + updateFrequency/10; jitterUpdateFrequency > jitterDurationMilliseconds {
jitterDurationMilliseconds = jitterUpdateFrequency
}

objs = append(objs, nodeHeartbeatStage)
nodeHeartbeatStage.Spec.Delay.DurationMilliseconds = format.Ptr(durationMilliseconds)
nodeHeartbeatStage.Spec.Delay.JitterDurationMilliseconds = format.Ptr(jitterDurationMilliseconds)
}

objs = append(objs, nodeHeartbeatStage)
return objs, nil
}

Expand Down

0 comments on commit f0b9c44

Please sign in to comment.