Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group instances into clusters #8

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions config/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ type DBInstance struct {
Tags Tags
}

// DBCluster is a group of DBInstances
type DBCluster struct {
*rds.DBCluster
Instances []*DBInstance
Tags Tags
}

// Filters ...
type Filters map[string]string

Expand Down
27 changes: 27 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
module github.com/seatgeek/aws-dynamic-consul-catalog

go 1.14

require (
github.com/armon/go-metrics v0.3.3 // indirect
github.com/aws/aws-sdk-go v1.13.5-0.20180223184012-ebef4262e06a
github.com/go-ini/ini v1.27.3-0.20170519023713-afbc45e87f3b // indirect
github.com/hashicorp/consul v1.1.0
github.com/hashicorp/go-msgpack v1.1.5 // indirect
github.com/hashicorp/go-rootcerts v0.0.0-20160503143440-6bb64b370b90 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/hashicorp/memberlist v0.2.2 // indirect
github.com/hashicorp/serf v0.8.2-0.20170419221626-65c2babe73c7 // indirect
github.com/imkira/go-observer v1.0.3
github.com/jmespath/go-jmespath v0.0.0-20160803190731-bd40a432e4c7 // indirect
github.com/mitchellh/go-homedir v0.0.0-20161203194507-b8bc1bf76747 // indirect
github.com/mitchellh/go-testing-interface v1.14.1 // indirect
github.com/patrickmn/go-cache v2.0.1-0.20170418232947-7ac151875ffb+incompatible
github.com/pkg/errors v0.9.1 // indirect
github.com/seatgeek/logrus-gelf-formatter v0.0.0-20180829220724-ce23ecb3f367
github.com/sirupsen/logrus v1.4.2
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/stretchr/testify v1.5.1 // indirect
golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2 // indirect
gopkg.in/urfave/cli.v1 v1.19.1
)
180 changes: 180 additions & 0 deletions go.sum

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion service/rds/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,18 @@ func (r *RDS) Run() {

allInstances := observer.NewProperty(nil)
filteredInstances := observer.NewProperty(nil)

writableInstances := observer.NewProperty(nil)
writableClusters := observer.NewProperty(nil)

catalogState := &config.CatalogState{}

go r.backend.CatalogReader(catalogState, r.consulNodeName, r.quitCh)
go r.reader(allInstances)
go r.filter(allInstances, filteredInstances)
go r.writer(filteredInstances, catalogState)
go r.clusterize(filteredInstances, writableInstances, writableClusters)
go r.writer(writableInstances, catalogState)
go r.clusterWriter(writableClusters, catalogState)

<-r.quitCh
}
85 changes: 85 additions & 0 deletions service/rds/clusterize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package rds

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/rds"
observer "github.com/imkira/go-observer"
"github.com/seatgeek/aws-dynamic-consul-catalog/config"
log "github.com/sirupsen/logrus"
)

func (r *RDS) clusterize(inInstancesProp, outInstancesProp, outClustersProp observer.Property) {
logger := log.WithField("worker", "clusterize")
logger.Info("Starting RDS instance clusterize worker")
stream := inInstancesProp.Observe()

for {
select {
case <-r.quitCh:
return

// wait for changes
case <-stream.Changes():
logger.Debug("Clusterizing instances")

stream.Next()
instances := stream.Value().([]*config.DBInstance)

standaloneInstances := make([]*config.DBInstance, 0)
clusters := make(map[string]*config.DBCluster)

for _, instance := range instances {
clusterID := aws.StringValue(instance.DBClusterIdentifier)
if clusterID == "" {
standaloneInstances = append(standaloneInstances, instance)
logger.Debugf("Instance %s is not part of a cluster", aws.StringValue(instance.DBInstanceIdentifier))
continue
}

logger.Debugf("Instance %s is part of cluster: %s",
aws.StringValue(instance.DBInstanceIdentifier), clusterID)
cluster, ok := clusters[clusterID]
if !ok {
rdsCluster := r.readDBCluster(clusterID)
if rdsCluster == nil {
log.Errorf("Error fetching cluster for instance: %s with ClusterID: %s (skipping)",
aws.StringValue(instance.DBInstanceIdentifier), clusterID)
continue
}

cluster = &config.DBCluster{
DBCluster: rdsCluster,
Instances: []*config.DBInstance{},
Tags: make(config.Tags),
}
clusters[clusterID] = cluster
}
// attach this instance to the cluster...
cluster.Instances = append(cluster.Instances, instance)
}
outInstancesProp.Update(standaloneInstances)

clusterList := []*config.DBCluster{}
for _, v := range clusters {
clusterList = append(clusterList, v)
}
outClustersProp.Update(clusterList)
logger.Debugf("Finished clusterizing RDS instances. Clusters: %d Instances: %d",
len(clusters), len(instances))
}
}
}

func (r *RDS) readDBCluster(clusterID string) *rds.DBCluster {
input := &rds.DescribeDBClustersInput{DBClusterIdentifier: aws.String(clusterID)}
output, err := r.rds.DescribeDBClusters(input)
if err != nil {
log.Fatal(err)
}
for _, cluster := range output.DBClusters {
if clusterID == aws.StringValue(cluster.DBClusterIdentifier) {
return cluster
}
}
return nil
}
229 changes: 229 additions & 0 deletions service/rds/clusterwriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
package rds

import (
"fmt"
"os"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/imkira/go-observer"
"github.com/seatgeek/aws-dynamic-consul-catalog/config"
log "github.com/sirupsen/logrus"
)

func (r *RDS) clusterWriter(inClustersProp observer.Property, state *config.CatalogState) {
logger := log.WithField("worker", "clusterWriter")
logger.Info("Starting RDS Consul Catalog clusterWriter")

stream := inClustersProp.Observe()
for {
select {
case <-r.quitCh:
return

// wait for changes
case <-stream.Changes():
state.Lock()
stream.Next()
clusters := stream.Value().([]*config.DBCluster)

seen := state.Services.GetSeen()

found := &config.SeenCatalog{
Services: make([]string, 0),
Checks: make([]string, 0),
}

for _, cluster := range clusters {
r.writeClusterBackendCatalog(cluster, logger, state, found)
}

for _, service := range r.getDifference(seen.Services, found.Services) {
logger.Warnf("Deleting service %s", service)
r.backend.DeleteService(service, r.consulNodeName)
}

for _, check := range r.getDifference(seen.Checks, found.Checks) {
logger.Warnf("Deleting check %s", check)
r.backend.DeleteCheck(check, r.consulNodeName)
}

logger.Debug("Finished Consul Catalog cluster write")
state.Unlock()
}
}
}

func (r *RDS) writeClusterBackendCatalog(cluster *config.DBCluster, logger *log.Entry, state *config.CatalogState, seen *config.SeenCatalog) {
logger = logger.WithField("cluster", aws.StringValue(cluster.DBClusterIdentifier))

name := r.getClusterServiceName(cluster)
if name == "" {
return
}
id := aws.StringValue(cluster.DBClusterIdentifier)

if *cluster.Status == "creating" {
logger.Warnf("Cluster %s id being created, skipping for now", name)
return
}

if cluster.Endpoint == nil {
logger.Errorf("Cluster %s does not have an endpoint yet, the cluster is in state: %s",
name, *cluster.Status)
return
}

clusterStatus := extractStatus(aws.StringValue(cluster.Status))

memberRole := map[string]string{}
for _, member := range cluster.DBClusterMembers {
if aws.BoolValue(member.IsClusterWriter) {
memberRole[aws.StringValue(member.DBInstanceIdentifier)] = "cluster-writer"
} else {
memberRole[aws.StringValue(member.DBInstanceIdentifier)] = "cluster-reader"
}
}

for _, instance := range cluster.Instances {
tags := []string{}
for k, v := range instance.Tags {
tags = append(tags, fmt.Sprintf("%s-%s", k, v))
}
tags = append(tags, fmt.Sprintf("cluster-rw-role-%s",
memberRole[aws.StringValue(instance.DBInstanceIdentifier)]))
service := &config.Service{
ServiceID: id,
ServiceName: name,
ServicePort: int(aws.Int64Value(instance.DbInstancePort)),
ServiceTags: tags,
CheckID: fmt.Sprintf("service:%s Node:%s", id, aws.StringValue(instance.DBInstanceIdentifier)),
CheckNode: aws.StringValue(instance.DBInstanceIdentifier),
CheckNotes: fmt.Sprintf("RDS Instance Status: %s", aws.StringValue(instance.DBInstanceStatus)),
CheckStatus: clusterStatus,
CheckOutput: fmt.Sprintf("Pending tasks: %s\n\nAddr: %s\n\nmanaged by aws-dynamic-consul-catalog",
instance.PendingModifiedValues.GoString(), aws.StringValue(instance.Endpoint.Address)),
ServiceAddress: aws.StringValue(instance.Endpoint.Address),
}

service.ServiceMeta = make(map[string]string)
service.ServiceMeta["Engine"] = aws.StringValue(instance.Engine)
service.ServiceMeta["EngineVersion"] = aws.StringValue(instance.EngineVersion)
service.ServiceMeta["DBName"] = aws.StringValue(instance.DBName)
service.ServiceMeta["DBInstanceClass"] = aws.StringValue(instance.DBInstanceClass)
service.ServiceMeta["DBInstanceIdentifier"] = aws.StringValue(instance.DBInstanceIdentifier)
service.ServiceMeta["RoleInCluster"] = memberRole[aws.StringValue(instance.DBInstanceIdentifier)]
service.ServiceMeta["ClusterName"] = aws.StringValue(cluster.DBClusterIdentifier)

if stringInSlice(service.ServiceAddress, seen.Services) {
logger.Errorf("Found duplicate Service ID %s - possible duplicate 'consul_service_name' RDS tag with same Replication Role", service.ServiceID)
if r.onDuplicate == "quit" {
os.Exit(1)
}
if r.onDuplicate == "ignore-skip-last" {
logger.Errorf("Ignoring current service")
return
}
}
seen.Services = append(seen.Services, service.ServiceAddress)

if stringInSlice(service.CheckID, seen.Checks) {
logger.Errorf("Found duplicate Check ID %s - possible duplicate 'consul_service_name' RDS tag with same Replication Role", service.CheckID)
if r.onDuplicate == "quit" {
os.Exit(1)
}
if r.onDuplicate == "ignore-skip-last" {
logger.Errorf("Ignoring current service")
return
}
}
seen.Checks = append(seen.Checks, service.CheckID)

existingService, ok := state.Services[id]
if ok {
logger.Debugf("Service %s exist in remote catalog, lets compare", id)

if r.identicalService(existingService, service, logger) {
logger.Debugf("Services are identical, skipping")
return
}

logger.Info("Services are not identical, updating catalog")
} else {
logger.Infof("Service %s doesn't exist in remote catalog, creating", id)
}

service.CheckOutput = service.CheckOutput + fmt.Sprintf("\n\nLast update: %s", time.Now().Format(time.RFC1123Z))
r.backend.WriteService(service)
}
}

func extractStatus(status string) string {
switch status {
case "backing-up":
status = "passing"
case "available":
status = "passing"
case "maintenance":
status = "passing"
case "modifying":
status = "passing"
case "creating":
status = "critical"
case "deleting":
status = "critical"
case "failed":
status = "critical"
case "rebooting":
status = "passing"
case "renaming":
status = "critical"
case "restore-error":
status = "critical"
case "inaccessible-encryption-credentials":
status = "critical"
case "incompatible-credentials":
status = "critical"
case "incompatible-network":
status = "critical"
case "incompatible-option-group":
status = "critical"
case "incompatible-parameters":
status = "critical"
case "incompatible-restore":
status = "critical"
case "resetting-master-credentials":
status = "warning"
case "storage-optimization":
status = "passing"
case "storage-full":
status = "warning"
case "upgrading":
status = "warning"
default:
status = "passing"
}
return status
}

func (r *RDS) getClusterServiceName(cluster *config.DBCluster) string {
// prefer the consul_service_name from instance tags
if name, ok := cluster.Tags["consul_service_name"]; ok {
return r.servicePrefix + name + r.serviceSuffix
}

// derive from the instance DB cluster name
name := aws.StringValue(cluster.DBClusterIdentifier)
if name != "" {
return r.servicePrefix + name + r.serviceSuffix
}

// derive from the instance DB name
name = aws.StringValue(cluster.DatabaseName)
if name != "" {
return r.servicePrefix + name + r.serviceSuffix
}

log.Errorf("Failed to find service name for " + aws.StringValue(cluster.DBClusterArn))
return ""
}
17 changes: 17 additions & 0 deletions service/rds/kill.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash

leader="$(curl https://consul.internal.classdojo.com/v1/status/leader | sed 's/:8300//' | sed 's/"//g')"
while :
do
serviceID="$(curl http://$leader:8500/v1/health/state/critical | ./jq '.[0].ServiceID' | sed 's/"//g')"
node="$(curl http://$leader:8500/v1/health/state/critical | ./jq '.[0].Node' | sed 's/"//g')"
echo "serviceID=$serviceID, node=$node"
size=${#serviceID}
echo "size=$size"
if [ $size -ge 7 ]; then
curl --request PUT http://$node:8500/v1/agent/service/deregister/$serviceID
else
break
fi
done
curl http://$leader:8500/v1/health/state/critical
Loading