Skip to content

Commit

Permalink
添加leaderElection,支持多副本高可用部署
Browse files Browse the repository at this point in the history
  • Loading branch information
silenceper committed Mar 3, 2020
1 parent 9a0b3b8 commit 57029ea
Show file tree
Hide file tree
Showing 5 changed files with 327 additions and 55 deletions.
21 changes: 21 additions & 0 deletions common/kubernetes/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package kubernetes

import (
clientv1 "k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
kuberecord "k8s.io/client-go/tools/record"
"k8s.io/klog"
)

// CreateEventRecorder creates an event recorder to send custom events to Kubernetes to be recorded for targeted Kubernetes objects
func CreateEventRecorder(kubeClient clientset.Interface) kuberecord.EventRecorder {
eventBroadcaster := kuberecord.NewBroadcaster()
eventBroadcaster.StartLogging(klog.V(4).Infof)
if _, isfake := kubeClient.(*fake.Clientset); !isfake {
eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: corev1.New(kubeClient.CoreV1().RESTClient()).Events("")})
}
return eventBroadcaster.NewRecorder(scheme.Scheme, clientv1.EventSource{Component: "kube-eventer"})
}
101 changes: 85 additions & 16 deletions deploy/deploy.yaml
Original file line number Diff line number Diff line change
@@ -1,28 +1,97 @@
apiVersion: extensions/v1beta1
apiVersion: apps/v1beta2
kind: Deployment
metadata:
labels:
name: kube-eventer
name: kube-eventer
namespace: kube-system
spec:
replicas: 1
replicas: 2
selector:
matchLabels:
app: kube-eventer
template:
metadata:
labels:
task: monitoring
k8s-app: kube-eventer
app: kube-eventer
annotations:
scheduler.alpha.kubernetes.io/critical-pod: ''
spec:
serviceAccount: admin
dnsPolicy: ClusterFirstWithHostNet
serviceAccount: kube-eventer
containers:
- name: kube-eventer
image: registry.aliyuncs.com/acs/kube-eventer-amd64:v1.1.0-c93a835-aliyun
imagePullPolicy: IfNotPresent
env:
- name: TZ
value: "Asia/Shanghai"
command:
- /kube-eventer
- --source=kubernetes:https://kubernetes.default
## .e.g,dingtalk sink demo
- --sink=dingtalk:[your_webhook_url]&label=[your_cluster_id]&level=[Normal or Warning(default)]
- image: silenceper/kube-eventer:v1.1.0-8e62038
name: kube-eventer
command:
- "/kube-eventer"
- "--source=kubernetes:https://kubernetes.default"
## .e.g,dingtalk sink demo
- --sink=dingtalk:[your_webhook_url]&label=[your_cluster_id]&level=[Normal or Warning(default)]
env:
# If TZ is assigned, set the TZ value as the time zone
- name: TZ
value: "Asia/Shanghai"
volumeMounts:
- name: localtime
mountPath: /etc/localtime
readOnly: true
- name: zoneinfo
mountPath: /usr/share/zoneinfo
readOnly: true
resources:
requests:
cpu: 100m
memory: 100Mi
limits:
cpu: 500m
memory: 250Mi
volumes:
- name: localtime
hostPath:
path: /etc/localtime
- name: zoneinfo
hostPath:
path: /usr/share/zoneinfo
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: kube-eventer
rules:
- apiGroups:
- ""
resources:
- events
verbs:
- get
- list
- watch
- apiGroups:
- "coordination.k8s.io"
resources:
- leases
verbs:
- get
- create
- update
- delete
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
annotations:
name: kube-eventer
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: kube-eventer
subjects:
- kind: ServiceAccount
name: kube-eventer
namespace: kube-system
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: kube-eventer
namespace: kube-system
133 changes: 121 additions & 12 deletions eventer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package main

import (
ctx "context"
"flag"
"fmt"
"net"
Expand All @@ -29,51 +30,116 @@ import (

"github.com/AliyunContainerService/kube-eventer/api"
"github.com/AliyunContainerService/kube-eventer/common/flags"
kubeconfig "github.com/AliyunContainerService/kube-eventer/common/kubernetes"
"github.com/AliyunContainerService/kube-eventer/manager"
"github.com/AliyunContainerService/kube-eventer/sinks"
"github.com/AliyunContainerService/kube-eventer/sources"
"github.com/AliyunContainerService/kube-eventer/version"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeclient "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/klog"
)

var (
argFrequency = flag.Duration("frequency", 30*time.Second, "The resolution at which Eventer pushes events to sinks")
argMaxProcs = flag.Int("max_procs", 0, "max number of CPUs that can be used simultaneously. Less than 1 for default (number of cores)")
argSources flags.Uris
argFrequency = flag.Duration("frequency", 30*time.Second, "The resolution at which Eventer pushes events to sinks")
argMaxProcs = flag.Int("max_procs", 0, "max number of CPUs that can be used simultaneously. Less than 1 for default (number of cores)")
argSources flags.Uris

argSinks flags.Uris
argVersion bool
argHealthzIP = flag.String("healthz-ip", "0.0.0.0", "ip eventer health check service uses")
argHealthzPort = flag.Uint("healthz-port", 8084, "port eventer health check listens on")
namespace = flag.String("namespace", "kube-system", "Namespace in which kube-eventer run.")
)

func main() {
quitChannel := make(chan struct{}, 0)

klog.InitFlags(nil)
defer klog.Flush()

flag.Var(&argSources, "source", "source(s) to read events from")
flag.Var(&argSinks, "sink", "external sink(s) that receive events")
flag.BoolVar(&argVersion, "version", false, "print version info and exit")
leaderElection := defaultLeaderElectionConfiguration()
leaderElection.LeaderElect = true

bindLeaderElectionFlags(&leaderElection)

flag.Parse()

if argVersion {
fmt.Println(version.VersionInfo())
os.Exit(0)
}

setMaxProcs()

klog.Infof(strings.Join(os.Args, " "))
klog.Info(version.VersionInfo())

setMaxProcs()

go startHTTPServer()

if len(argSources) != 1 {
klog.Fatal("Wrong number of sources specified")
}

if err := validateFlags(); err != nil {
klog.Fatal(err)
}

// sources
if len(argSources) != 1 {
klog.Fatal("Wrong number of sources specified")
if !leaderElection.LeaderElect {
run()
} else {
id, err := os.Hostname()
if err != nil {
klog.Fatalf("Unable to get hostname: %v", err)
}

cfg, err := kubeconfig.GetKubeClientConfig(&argSources[0].Val)
if err != nil {
klog.Fatalf("Get KubeClientConfig Error: %v", err)
}
kubeClient := kubeclient.NewForConfigOrDie(cfg)

lock, err := resourcelock.New(
leaderElection.ResourceLock,
*namespace,
"kube-eventer",
kubeClient.CoreV1(),
kubeClient.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: kubeconfig.CreateEventRecorder(kubeClient),
},
)
if err != nil {
klog.Fatalf("Unable to create leader election lock: %v", err)
}

leaderelection.RunOrDie(ctx.TODO(), leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: leaderElection.LeaseDuration.Duration,
RenewDeadline: leaderElection.RenewDeadline.Duration,
RetryPeriod: leaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ ctx.Context) {
// Since we are committing a suicide after losing
// mastership, we can safely ignore the argument.
run()
},
OnStoppedLeading: func() {
klog.Fatalf("lost master")
},
},
})
}
}

func run() {
quitChannel := make(chan struct{}, 0)

sourceFactory := sources.NewSourceFactory()
sources, err := sourceFactory.BuildAll(argSources)
if err != nil {
Expand Down Expand Up @@ -107,8 +173,6 @@ func main() {
manager.Start()
klog.Infof("Starting eventer")

go startHTTPServer()

<-quitChannel
}

Expand Down Expand Up @@ -150,3 +214,48 @@ func setMaxProcs() {
klog.Warningf("Specified max procs of %d but using %d", numProcs, actualNumProcs)
}
}

const (
defaultLeaseDuration = 15 * time.Second
defaultRenewDeadline = 10 * time.Second
defaultRetryPeriod = 2 * time.Second
)

func defaultLeaderElectionConfiguration() componentbaseconfig.LeaderElectionConfiguration {
return componentbaseconfig.LeaderElectionConfiguration{
LeaderElect: false,
LeaseDuration: metav1.Duration{Duration: defaultLeaseDuration},
RenewDeadline: metav1.Duration{Duration: defaultRenewDeadline},
RetryPeriod: metav1.Duration{Duration: defaultRetryPeriod},
ResourceLock: resourcelock.LeasesResourceLock,
}
}

func bindLeaderElectionFlags(l *componentbaseconfig.LeaderElectionConfiguration) {
flag.BoolVar(&l.LeaderElect, "leader-elect", l.LeaderElect, ""+
"Start a leader election client and gain leadership before "+
"executing the main loop. Enable this when running replicated "+
"components for high availability.")
flag.DurationVar(&l.LeaseDuration.Duration, "leader-elect-lease-duration", l.LeaseDuration.Duration, ""+
"The duration that non-leader candidates will wait after observing a leadership "+
"renewal until attempting to acquire leadership of a led but unrenewed leader "+
"slot. This is effectively the maximum duration that a leader can be stopped "+
"before it is replaced by another candidate. This is only applicable if leader "+
"election is enabled.")
flag.DurationVar(&l.RenewDeadline.Duration, "leader-elect-renew-deadline", l.RenewDeadline.Duration, ""+
"The interval between attempts by the acting master to renew a leadership slot "+
"before it stops leading. This must be less than or equal to the lease duration. "+
"This is only applicable if leader election is enabled.")
flag.DurationVar(&l.RetryPeriod.Duration, "leader-elect-retry-period", l.RetryPeriod.Duration, ""+
"The duration the clients should wait between attempting acquisition and renewal "+
"of a leadership. This is only applicable if leader election is enabled.")
flag.StringVar(&l.ResourceLock, "leader-elect-resource-lock", l.ResourceLock, ""+
"The type of resource object that is used for locking during "+
"leader election. Supported options are `leases`(default), `endpoints` and `configmaps`.")
flag.StringVar(&l.ResourceName, "leader-elect-resource-name", l.ResourceName, ""+
"The name of resource object that is used for locking during "+
"leader election.")
flag.StringVar(&l.ResourceNamespace, "leader-elect-resource-namespace", l.ResourceNamespace, ""+
"The namespace of resource object that is used for locking during "+
"leader election.")
}
25 changes: 9 additions & 16 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,33 @@ require (
github.com/aws/aws-sdk-go v1.19.6
github.com/denverdino/aliyungo v0.0.0-20190410085603-611ead8a6fed
github.com/go-sql-driver/mysql v1.4.1
github.com/golang/protobuf v1.3.1
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/protobuf v1.3.2
github.com/google/cadvisor v0.33.1
github.com/google/gofuzz v1.0.0 // indirect
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d // indirect
github.com/imdario/mergo v0.3.7 // indirect
github.com/influxdata/influxdb v1.7.7
github.com/kr/pretty v0.1.0 // indirect
github.com/olivere/elastic v6.2.23+incompatible
github.com/olivere/elastic v6.2.23+incompatible // indirect
github.com/olivere/elastic/v7 v7.0.6
github.com/pborman/uuid v1.2.0
github.com/prometheus/client_golang v1.0.0
github.com/riemann/riemann-go-client v0.4.0
github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9
github.com/smartystreets/gunit v1.0.0 // indirect
github.com/spf13/pflag v1.0.3 // indirect
github.com/stretchr/testify v1.3.0
golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8 // indirect
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 // indirect
github.com/stretchr/testify v1.4.0
golang.org/x/sys v0.0.0-20190616124812-15dcb6c0061f // indirect
golang.org/x/text v0.3.2 // indirect
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c // indirect
google.golang.org/appengine v1.5.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/olivere/elastic.v3 v3.0.75
gopkg.in/olivere/elastic.v5 v5.0.81
gopkg.in/olivere/elastic.v6 v6.2.23
k8s.io/api v0.0.0-20190627205229-acea843d18eb
k8s.io/apimachinery v0.0.0-20190627205106-bc5732d141a8
k8s.io/api v0.17.3
k8s.io/apimachinery v0.17.3
k8s.io/apiserver v0.0.0-20190606205144-71ebb8303503
k8s.io/client-go v11.0.0+incompatible
k8s.io/klog v0.3.1
k8s.io/utils v0.0.0-20190607212802-c55fbcfc754a // indirect
sigs.k8s.io/yaml v1.1.0 // indirect
k8s.io/component-base v0.17.3
k8s.io/klog v1.0.0
k8s.io/kube-openapi v0.0.0-20200204173128-addea2498afe // indirect
)

replace (
Expand Down
Loading

0 comments on commit 57029ea

Please sign in to comment.