diff --git a/deploy/operator.yaml b/deploy/operator.yaml index 8d9f88cf2..576dd3f34 100644 --- a/deploy/operator.yaml +++ b/deploy/operator.yaml @@ -41,6 +41,8 @@ spec: image: $SRIOV_NETWORK_OPERATOR_IMAGE command: - sriov-network-operator + args: + - --leader-elect=$OPERATOR_LEADER_ELECTION_ENABLE resources: requests: cpu: 100m diff --git a/deploy/role.yaml b/deploy/role.yaml index 409286ab5..a24f13729 100644 --- a/deploy/role.yaml +++ b/deploy/role.yaml @@ -56,6 +56,12 @@ rules: - get - list - watch +- apiGroups: + - 'coordination.k8s.io' + resources: + - 'leases' + verbs: + - '*' --- apiVersion: rbac.authorization.k8s.io/v1 kind: Role diff --git a/deployment/sriov-network-operator/templates/role.yaml b/deployment/sriov-network-operator/templates/role.yaml index 6058a86e1..29cf80cce 100644 --- a/deployment/sriov-network-operator/templates/role.yaml +++ b/deployment/sriov-network-operator/templates/role.yaml @@ -59,6 +59,12 @@ rules: - get - list - watch + - apiGroups: + - 'coordination.k8s.io' + resources: + - 'leases' + verbs: + - '*' --- apiVersion: rbac.authorization.k8s.io/v1 kind: Role diff --git a/hack/deploy-wait.sh b/hack/deploy-wait.sh index d4e66281e..5a0af6023 100755 --- a/hack/deploy-wait.sh +++ b/hack/deploy-wait.sh @@ -20,6 +20,7 @@ done if ! $ready; then echo "Timed out waiting for features to be ready" - oc get nodes + kubectl get nodes + kubectl cluster-info dump -n ${NAMESPACE} exit 1 fi diff --git a/hack/env.sh b/hack/env.sh index 26d9c767e..5b08d7b8b 100755 --- a/hack/env.sh +++ b/hack/env.sh @@ -34,3 +34,4 @@ export ADMISSION_CONTROLLERS_CERTIFICATES_CERT_MANAGER_ENABLED=${ADMISSION_CONTR export ADMISSION_CONTROLLERS_CERTIFICATES_OPERATOR_CA_CRT=${ADMISSION_CONTROLLERS_CERTIFICATES_OPERATOR_CA_CRT:-""} export ADMISSION_CONTROLLERS_CERTIFICATES_INJECTOR_CA_CRT=${ADMISSION_CONTROLLERS_CERTIFICATES_INJECTOR_CA_CRT:-""} export DEV_MODE=${DEV_MODE:-"FALSE"} +export OPERATOR_LEADER_ELECTION_ENABLE=${OPERATOR_LEADER_ELECTION_ENABLE:-"false"} diff --git a/hack/run-e2e-conformance-virtual-ocp.sh b/hack/run-e2e-conformance-virtual-ocp.sh index 03d788e0d..20859c669 100755 --- a/hack/run-e2e-conformance-virtual-ocp.sh +++ b/hack/run-e2e-conformance-virtual-ocp.sh @@ -189,6 +189,7 @@ export OPERATOR_EXEC=kubectl export CLUSTER_TYPE=openshift export DEV_MODE=TRUE export CLUSTER_HAS_EMULATED_PF=TRUE +export OPERATOR_LEADER_ELECTION_ENABLE=true export SRIOV_NETWORK_OPERATOR_IMAGE="$registry/$NAMESPACE/sriov-network-operator:latest" export SRIOV_NETWORK_CONFIG_DAEMON_IMAGE="$registry/$NAMESPACE/sriov-network-config-daemon:latest" diff --git a/main.go b/main.go index 857ac628d..ba3a41c68 100644 --- a/main.go +++ b/main.go @@ -50,6 +50,7 @@ import ( "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/featuregate" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/leaderelection" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" snolog "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/log" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils" @@ -101,22 +102,42 @@ func main() { le := leaderelection.GetLeaderElectionConfig(kubeClient, enableLeaderElection) + leaderElectionMgr, err := ctrl.NewManager(restConfig, ctrl.Options{ + Scheme: scheme, + HealthProbeBindAddress: probeAddr, + Metrics: server.Options{BindAddress: "0"}, + LeaderElection: enableLeaderElection, + LeaseDuration: &le.LeaseDuration, + LeaderElectionReleaseOnCancel: true, + RenewDeadline: &le.RenewDeadline, + RetryPeriod: &le.RetryPeriod, + LeaderElectionID: consts.LeaderElectionID, + }) + if err != nil { + setupLog.Error(err, "unable to start leader election manager") + os.Exit(1) + } + + if err := leaderElectionMgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { + setupLog.Error(err, "unable to set up health check") + os.Exit(1) + } + if err := leaderElectionMgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { + setupLog.Error(err, "unable to set up ready check") + os.Exit(1) + } + mgr, err := ctrl.NewManager(restConfig, ctrl.Options{ - Scheme: scheme, - Metrics: server.Options{BindAddress: metricsAddr}, - WebhookServer: webhook.NewServer(webhook.Options{Port: 9443}), - HealthProbeBindAddress: probeAddr, - LeaderElection: enableLeaderElection, - LeaseDuration: &le.LeaseDuration, - RenewDeadline: &le.RenewDeadline, - RetryPeriod: &le.RetryPeriod, - LeaderElectionID: "a56def2a.openshift.io", - Cache: cache.Options{DefaultNamespaces: map[string]cache.Config{vars.Namespace: {}}}, + Scheme: scheme, + Metrics: server.Options{BindAddress: metricsAddr}, + WebhookServer: webhook.NewServer(webhook.Options{Port: 9443}), + Cache: cache.Options{DefaultNamespaces: map[string]cache.Config{vars.Namespace: {}}}, }) if err != nil { setupLog.Error(err, "unable to start manager") os.Exit(1) } + mgrGlobal, err := ctrl.NewManager(restConfig, ctrl.Options{ Scheme: scheme, Metrics: server.Options{BindAddress: "0"}, @@ -230,29 +251,86 @@ func main() { } // +kubebuilder:scaffold:builder - if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { - setupLog.Error(err, "unable to set up health check") - os.Exit(1) - } - if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { - setupLog.Error(err, "unable to set up ready check") + leaderElectionErr := make(chan error) + leaderElectionContext, cancelLeaderElection := context.WithCancel(context.Background()) + go func() { + setupLog.Info("starting leader election manager") + leaderElectionErr <- leaderElectionMgr.Start(leaderElectionContext) + }() + + select { + case <-leaderElectionMgr.Elected(): + case err := <-leaderElectionErr: + setupLog.Error(err, "Leader Election Manager error") os.Exit(1) } - stopCh := ctrl.SetupSignalHandler() + setupLog.Info("acquired lease") + + stopSignalCh := ctrl.SetupSignalHandler() + + globalManagerErr := make(chan error) + globalManagerCtx, globalManagerCancel := context.WithCancel(context.Background()) go func() { - if err := mgrGlobal.Start(stopCh); err != nil { - setupLog.Error(err, "Manager Global exited non-zero") - os.Exit(1) - } + setupLog.Info("starting global manager") + globalManagerErr <- mgrGlobal.Start(globalManagerCtx) }() - // Remove all finalizers after controller is shut down - defer utils.Shutdown() + namespacedManagerErr := make(chan error) + namespacedManagerCtx, namespacedManagerCancel := context.WithCancel(context.Background()) + go func() { + setupLog.Info("starting namespaced manager") + namespacedManagerErr <- mgr.Start(namespacedManagerCtx) + }() + + select { + // Wait for a stop signal + case <-stopSignalCh.Done(): + setupLog.Info("Stop signal received") + + globalManagerCancel() + namespacedManagerCancel() + <-globalManagerErr + <-namespacedManagerErr + + utils.Shutdown() + + cancelLeaderElection() + <-leaderElectionErr + + case err := <-leaderElectionErr: + setupLog.Error(err, "Leader Election Manager error") + globalManagerCancel() + namespacedManagerCancel() + <-globalManagerErr + <-namespacedManagerErr + + os.Exit(1) + + case err := <-globalManagerErr: + setupLog.Error(err, "Global Manager error") + + namespacedManagerCancel() + <-namespacedManagerErr + + utils.Shutdown() + + cancelLeaderElection() + <-leaderElectionErr + + os.Exit(1) + + case err := <-namespacedManagerErr: + setupLog.Error(err, "Namsepaced Manager error") + + globalManagerCancel() + <-globalManagerErr + + utils.Shutdown() + + cancelLeaderElection() + <-leaderElectionErr - setupLog.Info("starting manager") - if err := mgr.Start(stopCh); err != nil { - setupLog.Error(err, "problem running manager") os.Exit(1) } } diff --git a/pkg/consts/constants.go b/pkg/consts/constants.go index efbefe198..8e5e1fdcf 100644 --- a/pkg/consts/constants.go +++ b/pkg/consts/constants.go @@ -35,6 +35,7 @@ const ( ServiceAccount = "ServiceAccount" DPConfigFileName = "config.json" OVSHWOLMachineConfigNameSuffix = "ovs-hw-offload" + LeaderElectionID = "a56def2a.openshift.io" LinkTypeEthernet = "ether" LinkTypeInfiniband = "infiniband" diff --git a/test/conformance/tests/test_sriov_operator.go b/test/conformance/tests/test_sriov_operator.go index 094a95cf0..2a40559e4 100644 --- a/test/conformance/tests/test_sriov_operator.go +++ b/test/conformance/tests/test_sriov_operator.go @@ -28,6 +28,7 @@ import ( runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" sriovv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" + "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" "github.com/k8snetworkplumbingwg/sriov-network-operator/test/util/clean" "github.com/k8snetworkplumbingwg/sriov-network-operator/test/util/cluster" "github.com/k8snetworkplumbingwg/sriov-network-operator/test/util/discovery" @@ -276,6 +277,35 @@ var _ = Describe("[sriov] operator", func() { }, 3*time.Minute, 5*time.Second).Should(Succeed()) }) }) + + It("should gracefully restart quickly", func() { + // This test case ensure leader election process runs smoothly when the operator's pod is restarted. + oldLease, err := clients.CoordinationV1Interface.Leases(operatorNamespace).Get(context.Background(), consts.LeaderElectionID, metav1.GetOptions{}) + if k8serrors.IsNotFound(err) { + Skip("Leader Election is not enabled on the cluster. Skipping") + } + Expect(err).ToNot(HaveOccurred()) + + oldOperatorPod := getOperatorPod() + + By("Delete the operator's pod") + deleteOperatorPod() + + By("Wait the new operator's pod to start") + Eventually(func(g Gomega) { + newOperatorPod := getOperatorPod() + Expect(newOperatorPod.Name).ToNot(Equal(oldOperatorPod.Name)) + Expect(newOperatorPod.Status.Phase).To(Equal(corev1.PodRunning)) + }, 45*time.Second, 5*time.Second) + + By("Assert the new operator's pod acquire the lease before 30 seconds") + Eventually(func(g Gomega) { + newLease, err := clients.CoordinationV1Interface.Leases(operatorNamespace).Get(context.Background(), consts.LeaderElectionID, metav1.GetOptions{}) + g.Expect(err).ToNot(HaveOccurred()) + + g.Expect(newLease.Spec.HolderIdentity).ToNot(Equal(oldLease.Spec.HolderIdentity)) + }, 30*time.Second, 5*time.Second).Should(Succeed()) + }) }) Describe("Generic SriovNetworkNodePolicy", func() { @@ -2675,14 +2705,17 @@ func getOperatorConfigLogLevel() int { return cfg.Spec.LogLevel } -func getOperatorLogs(since time.Time) []string { +func getOperatorPod() corev1.Pod { podList, err := clients.Pods(operatorNamespace).List(context.Background(), metav1.ListOptions{ LabelSelector: "name=sriov-network-operator", }) ExpectWithOffset(1, err).ToNot(HaveOccurred()) - ExpectWithOffset(1, podList.Items).To(HaveLen(1), "One operator pod expected") + ExpectWithOffset(1, podList.Items).ToNot(HaveLen(0), "At least one operator pod expected") + return podList.Items[0] +} - pod := podList.Items[0] +func getOperatorLogs(since time.Time) []string { + pod := getOperatorPod() logStart := metav1.NewTime(since) rawLogs, err := clients.Pods(pod.Namespace). GetLogs(pod.Name, &corev1.PodLogOptions{ @@ -2695,6 +2728,13 @@ func getOperatorLogs(since time.Time) []string { return strings.Split(string(rawLogs), "\n") } +func deleteOperatorPod() { + pod := getOperatorPod() + + err := clients.Pods(operatorNamespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{}) + ExpectWithOffset(1, err).ToNot(HaveOccurred()) +} + func assertObjectIsNotFound(name string, obj runtimeclient.Object) { Eventually(func() bool { err := clients.Get(context.Background(), runtimeclient.ObjectKey{Name: name, Namespace: operatorNamespace}, obj) diff --git a/test/util/client/clients.go b/test/util/client/clients.go index a96634c19..21eb12053 100644 --- a/test/util/client/clients.go +++ b/test/util/client/clients.go @@ -11,6 +11,7 @@ import ( discovery "k8s.io/client-go/discovery" clientgoscheme "k8s.io/client-go/kubernetes/scheme" appsv1client "k8s.io/client-go/kubernetes/typed/apps/v1" + coordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -37,6 +38,7 @@ type ClientSet struct { clientsriovv1.SriovnetworkV1Interface Config *rest.Config runtimeclient.Client + coordinationv1.CoordinationV1Interface } // New returns a *ClientBuilder with the given kubeconfig. @@ -67,6 +69,7 @@ func New(kubeconfig string) *ClientSet { clientSet.AppsV1Interface = appsv1client.NewForConfigOrDie(config) clientSet.DiscoveryInterface = discovery.NewDiscoveryClientForConfigOrDie(config) clientSet.SriovnetworkV1Interface = clientsriovv1.NewForConfigOrDie(config) + clientSet.CoordinationV1Interface = coordinationv1.NewForConfigOrDie(config) clientSet.Config = config crScheme := runtime.NewScheme()