Skip to content

Commit

Permalink
Merge pull request #556 from zeeke/fast-leader-election
Browse files Browse the repository at this point in the history
operator: LeaderElectionReleaseOnCancel
  • Loading branch information
SchSeba authored May 2, 2024
2 parents 047114b + 0ef001c commit c2d9e32
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 30 deletions.
2 changes: 2 additions & 0 deletions deploy/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ spec:
image: $SRIOV_NETWORK_OPERATOR_IMAGE
command:
- sriov-network-operator
args:
- --leader-elect=$OPERATOR_LEADER_ELECTION_ENABLE
resources:
requests:
cpu: 100m
Expand Down
6 changes: 6 additions & 0 deletions deploy/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ rules:
- get
- list
- watch
- apiGroups:
- 'coordination.k8s.io'
resources:
- 'leases'
verbs:
- '*'
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
Expand Down
6 changes: 6 additions & 0 deletions deployment/sriov-network-operator/templates/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ rules:
- get
- list
- watch
- apiGroups:
- 'coordination.k8s.io'
resources:
- 'leases'
verbs:
- '*'
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
Expand Down
3 changes: 2 additions & 1 deletion hack/deploy-wait.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ done

if ! $ready; then
echo "Timed out waiting for features to be ready"
oc get nodes
kubectl get nodes
kubectl cluster-info dump -n ${NAMESPACE}
exit 1
fi
1 change: 1 addition & 0 deletions hack/env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ export ADMISSION_CONTROLLERS_CERTIFICATES_CERT_MANAGER_ENABLED=${ADMISSION_CONTR
export ADMISSION_CONTROLLERS_CERTIFICATES_OPERATOR_CA_CRT=${ADMISSION_CONTROLLERS_CERTIFICATES_OPERATOR_CA_CRT:-""}
export ADMISSION_CONTROLLERS_CERTIFICATES_INJECTOR_CA_CRT=${ADMISSION_CONTROLLERS_CERTIFICATES_INJECTOR_CA_CRT:-""}
export DEV_MODE=${DEV_MODE:-"FALSE"}
export OPERATOR_LEADER_ELECTION_ENABLE=${OPERATOR_LEADER_ELECTION_ENABLE:-"false"}
1 change: 1 addition & 0 deletions hack/run-e2e-conformance-virtual-ocp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ export OPERATOR_EXEC=kubectl
export CLUSTER_TYPE=openshift
export DEV_MODE=TRUE
export CLUSTER_HAS_EMULATED_PF=TRUE
export OPERATOR_LEADER_ELECTION_ENABLE=true

export SRIOV_NETWORK_OPERATOR_IMAGE="$registry/$NAMESPACE/sriov-network-operator:latest"
export SRIOV_NETWORK_CONFIG_DAEMON_IMAGE="$registry/$NAMESPACE/sriov-network-config-daemon:latest"
Expand Down
130 changes: 104 additions & 26 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/featuregate"
"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/leaderelection"

"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
snolog "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/log"
"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms"
"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils"
Expand Down Expand Up @@ -101,22 +102,42 @@ func main() {

le := leaderelection.GetLeaderElectionConfig(kubeClient, enableLeaderElection)

leaderElectionMgr, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
HealthProbeBindAddress: probeAddr,
Metrics: server.Options{BindAddress: "0"},
LeaderElection: enableLeaderElection,
LeaseDuration: &le.LeaseDuration,
LeaderElectionReleaseOnCancel: true,
RenewDeadline: &le.RenewDeadline,
RetryPeriod: &le.RetryPeriod,
LeaderElectionID: consts.LeaderElectionID,
})
if err != nil {
setupLog.Error(err, "unable to start leader election manager")
os.Exit(1)
}

if err := leaderElectionMgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := leaderElectionMgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
os.Exit(1)
}

mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
Metrics: server.Options{BindAddress: metricsAddr},
WebhookServer: webhook.NewServer(webhook.Options{Port: 9443}),
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaseDuration: &le.LeaseDuration,
RenewDeadline: &le.RenewDeadline,
RetryPeriod: &le.RetryPeriod,
LeaderElectionID: "a56def2a.openshift.io",
Cache: cache.Options{DefaultNamespaces: map[string]cache.Config{vars.Namespace: {}}},
Scheme: scheme,
Metrics: server.Options{BindAddress: metricsAddr},
WebhookServer: webhook.NewServer(webhook.Options{Port: 9443}),
Cache: cache.Options{DefaultNamespaces: map[string]cache.Config{vars.Namespace: {}}},
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}

mgrGlobal, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
Metrics: server.Options{BindAddress: "0"},
Expand Down Expand Up @@ -230,29 +251,86 @@ func main() {
}
// +kubebuilder:scaffold:builder

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
leaderElectionErr := make(chan error)
leaderElectionContext, cancelLeaderElection := context.WithCancel(context.Background())
go func() {
setupLog.Info("starting leader election manager")
leaderElectionErr <- leaderElectionMgr.Start(leaderElectionContext)
}()

select {
case <-leaderElectionMgr.Elected():
case err := <-leaderElectionErr:
setupLog.Error(err, "Leader Election Manager error")
os.Exit(1)
}

stopCh := ctrl.SetupSignalHandler()
setupLog.Info("acquired lease")

stopSignalCh := ctrl.SetupSignalHandler()

globalManagerErr := make(chan error)
globalManagerCtx, globalManagerCancel := context.WithCancel(context.Background())
go func() {
if err := mgrGlobal.Start(stopCh); err != nil {
setupLog.Error(err, "Manager Global exited non-zero")
os.Exit(1)
}
setupLog.Info("starting global manager")
globalManagerErr <- mgrGlobal.Start(globalManagerCtx)
}()

// Remove all finalizers after controller is shut down
defer utils.Shutdown()
namespacedManagerErr := make(chan error)
namespacedManagerCtx, namespacedManagerCancel := context.WithCancel(context.Background())
go func() {
setupLog.Info("starting namespaced manager")
namespacedManagerErr <- mgr.Start(namespacedManagerCtx)
}()

select {
// Wait for a stop signal
case <-stopSignalCh.Done():
setupLog.Info("Stop signal received")

globalManagerCancel()
namespacedManagerCancel()
<-globalManagerErr
<-namespacedManagerErr

utils.Shutdown()

cancelLeaderElection()
<-leaderElectionErr

case err := <-leaderElectionErr:
setupLog.Error(err, "Leader Election Manager error")
globalManagerCancel()
namespacedManagerCancel()
<-globalManagerErr
<-namespacedManagerErr

os.Exit(1)

case err := <-globalManagerErr:
setupLog.Error(err, "Global Manager error")

namespacedManagerCancel()
<-namespacedManagerErr

utils.Shutdown()

cancelLeaderElection()
<-leaderElectionErr

os.Exit(1)

case err := <-namespacedManagerErr:
setupLog.Error(err, "Namsepaced Manager error")

globalManagerCancel()
<-globalManagerErr

utils.Shutdown()

cancelLeaderElection()
<-leaderElectionErr

setupLog.Info("starting manager")
if err := mgr.Start(stopCh); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/consts/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
ServiceAccount = "ServiceAccount"
DPConfigFileName = "config.json"
OVSHWOLMachineConfigNameSuffix = "ovs-hw-offload"
LeaderElectionID = "a56def2a.openshift.io"

LinkTypeEthernet = "ether"
LinkTypeInfiniband = "infiniband"
Expand Down
46 changes: 43 additions & 3 deletions test/conformance/tests/test_sriov_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"

sriovv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
"github.com/k8snetworkplumbingwg/sriov-network-operator/test/util/clean"
"github.com/k8snetworkplumbingwg/sriov-network-operator/test/util/cluster"
"github.com/k8snetworkplumbingwg/sriov-network-operator/test/util/discovery"
Expand Down Expand Up @@ -276,6 +277,35 @@ var _ = Describe("[sriov] operator", func() {
}, 3*time.Minute, 5*time.Second).Should(Succeed())
})
})

It("should gracefully restart quickly", func() {
// This test case ensure leader election process runs smoothly when the operator's pod is restarted.
oldLease, err := clients.CoordinationV1Interface.Leases(operatorNamespace).Get(context.Background(), consts.LeaderElectionID, metav1.GetOptions{})
if k8serrors.IsNotFound(err) {
Skip("Leader Election is not enabled on the cluster. Skipping")
}
Expect(err).ToNot(HaveOccurred())

oldOperatorPod := getOperatorPod()

By("Delete the operator's pod")
deleteOperatorPod()

By("Wait the new operator's pod to start")
Eventually(func(g Gomega) {
newOperatorPod := getOperatorPod()
Expect(newOperatorPod.Name).ToNot(Equal(oldOperatorPod.Name))
Expect(newOperatorPod.Status.Phase).To(Equal(corev1.PodRunning))
}, 45*time.Second, 5*time.Second)

By("Assert the new operator's pod acquire the lease before 30 seconds")
Eventually(func(g Gomega) {
newLease, err := clients.CoordinationV1Interface.Leases(operatorNamespace).Get(context.Background(), consts.LeaderElectionID, metav1.GetOptions{})
g.Expect(err).ToNot(HaveOccurred())

g.Expect(newLease.Spec.HolderIdentity).ToNot(Equal(oldLease.Spec.HolderIdentity))
}, 30*time.Second, 5*time.Second).Should(Succeed())
})
})

Describe("Generic SriovNetworkNodePolicy", func() {
Expand Down Expand Up @@ -2675,14 +2705,17 @@ func getOperatorConfigLogLevel() int {
return cfg.Spec.LogLevel
}

func getOperatorLogs(since time.Time) []string {
func getOperatorPod() corev1.Pod {
podList, err := clients.Pods(operatorNamespace).List(context.Background(), metav1.ListOptions{
LabelSelector: "name=sriov-network-operator",
})
ExpectWithOffset(1, err).ToNot(HaveOccurred())
ExpectWithOffset(1, podList.Items).To(HaveLen(1), "One operator pod expected")
ExpectWithOffset(1, podList.Items).ToNot(HaveLen(0), "At least one operator pod expected")
return podList.Items[0]
}

pod := podList.Items[0]
func getOperatorLogs(since time.Time) []string {
pod := getOperatorPod()
logStart := metav1.NewTime(since)
rawLogs, err := clients.Pods(pod.Namespace).
GetLogs(pod.Name, &corev1.PodLogOptions{
Expand All @@ -2695,6 +2728,13 @@ func getOperatorLogs(since time.Time) []string {
return strings.Split(string(rawLogs), "\n")
}

func deleteOperatorPod() {
pod := getOperatorPod()

err := clients.Pods(operatorNamespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{})
ExpectWithOffset(1, err).ToNot(HaveOccurred())
}

func assertObjectIsNotFound(name string, obj runtimeclient.Object) {
Eventually(func() bool {
err := clients.Get(context.Background(), runtimeclient.ObjectKey{Name: name, Namespace: operatorNamespace}, obj)
Expand Down
3 changes: 3 additions & 0 deletions test/util/client/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
discovery "k8s.io/client-go/discovery"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
appsv1client "k8s.io/client-go/kubernetes/typed/apps/v1"
coordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
Expand All @@ -37,6 +38,7 @@ type ClientSet struct {
clientsriovv1.SriovnetworkV1Interface
Config *rest.Config
runtimeclient.Client
coordinationv1.CoordinationV1Interface
}

// New returns a *ClientBuilder with the given kubeconfig.
Expand Down Expand Up @@ -67,6 +69,7 @@ func New(kubeconfig string) *ClientSet {
clientSet.AppsV1Interface = appsv1client.NewForConfigOrDie(config)
clientSet.DiscoveryInterface = discovery.NewDiscoveryClientForConfigOrDie(config)
clientSet.SriovnetworkV1Interface = clientsriovv1.NewForConfigOrDie(config)
clientSet.CoordinationV1Interface = coordinationv1.NewForConfigOrDie(config)
clientSet.Config = config

crScheme := runtime.NewScheme()
Expand Down

0 comments on commit c2d9e32

Please sign in to comment.