Skip to content

Commit

Permalink
feat(repair): move to per-table, not cached ring description
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal-Leszczynski committed Feb 22, 2024
1 parent d86640b commit e070900
Show file tree
Hide file tree
Showing 7 changed files with 811 additions and 874 deletions.
292 changes: 177 additions & 115 deletions pkg/service/repair/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ import (
"context"
"math"
"sort"
"strings"
"sync/atomic"

"github.com/pkg/errors"
"github.com/scylladb/go-log"
"github.com/scylladb/go-set/strset"
"github.com/scylladb/scylla-manager/v3/pkg/dht"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/util/slice"
Expand Down Expand Up @@ -97,193 +95,257 @@ func (r jobResult) Success() bool {
return r.err == nil || errors.Is(r.err, errTableDeleted)
}

// Tools shared between generator and tableGenerator.
type generatorTools struct {
ctl controller
ms masterSelector
submitter submitter[job, jobResult]
failFast bool
stop *atomic.Bool
logger log.Logger
}

func (gt generatorTools) stopGenerating() {
gt.stop.Store(true)
}

// generator is responsible for creating and orchestrating tableGenerators.
type generator struct {
generatorTools

target Target
plan *plan
ctl controller
ms masterSelector
pm ProgressManager
client *scyllaclient.Client
}

logger log.Logger
failFast bool

// Responsible for submitting jobs and receiving results.
submitter submitter[job, jobResult]
// Determines if generator should keep on generating new jobs.
stop atomic.Bool

// Statistics for logging purposes.
count int
success int
failed int
lastPercent int
func (g *generator) shouldGenerate() bool {
return !g.stop.Load()
}

func newGenerator(ctx context.Context, target Target, client *scyllaclient.Client,
i intensityChecker, s submitter[job, jobResult], logger log.Logger,
func newGenerator(ctx context.Context, target Target, client *scyllaclient.Client, i intensityChecker,
s submitter[job, jobResult], plan *plan, pm ProgressManager, logger log.Logger,
) (*generator, error) {
var ord, cnt int
for _, kp := range target.plan.Keyspaces {
for _, tp := range kp.Tables {
cnt += len(kp.TokenRepIdx)
ord++
logger.Info(ctx, "Repair order",
"order", ord,
"keyspace", kp.Keyspace,
"table", tp.Table,
"size", tp.Size,
"merge_ranges", tp.Optimize,
)
}
}

hosts := target.plan.Hosts()
status, err := client.Status(ctx)
if err != nil {
return nil, errors.Wrap(err, "get status")
}
if down := strset.New(status.Down().Hosts()...); down.HasAny(hosts...) {
return nil, errors.Errorf("ensure nodes are up, down nodes: %s", strings.Join(down.List(), ","))
}
closestDC, err := client.ClosestDC(ctx, status.DatacenterMap(target.DC))
if err != nil {
return nil, errors.Wrap(err, "calculate closest dc")
}

shards, err := client.HostsShardCount(ctx, hosts)
shards, err := client.HostsShardCount(ctx, plan.Hosts)
if err != nil {
return nil, err
}

return &generator{
plan: target.plan,
ctl: newRowLevelRepairController(i),
ms: newMasterSelector(shards, status.HostDC(), closestDC),
client: client,
logger: logger,
failFast: target.FailFast,
submitter: s,
count: cnt,
lastPercent: -1,
generatorTools: generatorTools{
ctl: newRowLevelRepairController(i),
ms: newMasterSelector(shards, status.HostDC(), closestDC),
submitter: s,
failFast: target.FailFast,
stop: &atomic.Bool{},
logger: logger,
},
target: target,
plan: plan,
pm: pm,
client: client,
}, nil
}

func (g *generator) Run(ctx context.Context) error {
g.logger.Info(ctx, "Start generator")
running := true

g.generateJobs()
if g.shouldExit() {
running = false
status, err := g.client.Status(ctx)
if err != nil {
return errors.Wrap(err, "get status")
}
var genErr error

for running {
if ctx.Err() != nil {
for _, ksp := range g.plan.Keyspaces {
if !g.shouldGenerate() {
break
}
select {
case <-ctx.Done():
running = false
case r := <-g.submitter.Results():
g.processResult(ctx, r)
g.generateJobs()
if g.shouldExit() {
running = false

ring, err := g.client.DescribeRing(ctx, ksp.Keyspace)
if err != nil {
return errors.Wrap(err, "describe ring")
}
_ = filteredRing(g.target, status, ring)

for _, tp := range ksp.Tables {
if !g.shouldGenerate() {
break
}

tg := g.newTableGenerator(ksp.Keyspace, tp, ring)
// All errors are logged, so in order to reduce clutter,
// return only the first one.
if err := tg.Run(ctx); err != nil && genErr == nil {
genErr = err
}
}
}

g.logger.Info(ctx, "Close generator")
g.submitter.Close() // Free workers waiting on next
return errors.Wrap(genErr, "see more errors in logs")
}

func (g *generator) newTableGenerator(keyspace string, tp tablePlan, ring scyllaclient.Ring) *tableGenerator {
todoRanges := make(map[scyllaclient.TokenRange]struct{})
for _, rt := range ring.ReplicaTokens {
for _, r := range rt.Ranges {
todoRanges[r] = struct{}{}
}
}
done := g.pm.GetDoneRanges(keyspace, tp.Table)
for _, r := range done {
delete(todoRanges, r)
}

// Don't return ctx error as graceful ctx is handled from service level
if g.failed > 0 {
return errors.Errorf("repair %d token ranges out of %d", g.failed, g.count)
tg := &tableGenerator{
generatorTools: g.generatorTools,
Keyspace: keyspace,
Table: tp.Table,
Ring: ring,
TodoRanges: todoRanges,
DoneReplicas: make(map[uint64]struct{}),
Optimize: tp.Optimize,
}
return nil
tg.logger = tg.logger.Named(keyspace + "." + tp.Table)
return tg
}

// tableGenerator is responsible for generating and orchestrating
// repair jobs of given table.
type tableGenerator struct {
generatorTools

Keyspace string
Table string
Ring scyllaclient.Ring
TodoRanges map[scyllaclient.TokenRange]struct{}
DoneReplicas map[uint64]struct{}
Optimize bool
Deleted bool
Err error
}

func (g *generator) processResult(ctx context.Context, r jobResult) {
func (tg *tableGenerator) Run(ctx context.Context) error {
tg.logger.Info(ctx, "Start repairing table", "keyspace", tg.Keyspace, "table", tg.Table)
tg.generateJobs()
for !tg.shouldExit() {
if ctx.Err() != nil {
break
}
select {
case <-ctx.Done():
case r := <-tg.submitter.Results():
tg.processResult(ctx, r)
tg.generateJobs()
}
}
tg.logger.Info(ctx, "Ended repairing table", "keyspace", tg.Keyspace, "table", tg.Table)
return tg.Err
}

func (tg *tableGenerator) processResult(ctx context.Context, jr jobResult) {
// Don't record context errors
if errors.Is(r.err, context.Canceled) {
if errors.Is(jr.err, context.Canceled) {
return
}

if r.err != nil && errors.Is(r.err, errTableDeleted) {
g.logger.Info(ctx, "Detected table deletion", "keyspace", r.keyspace, "table", r.table)
g.plan.MarkDeleted(r.keyspace, r.table)
if jr.err != nil && errors.Is(jr.err, errTableDeleted) {
tg.logger.Info(ctx, "Detected table deletion", "keyspace", jr.keyspace, "table", jr.table)
tg.Deleted = true
}

g.plan.MarkDoneRanges(r.keyspace, r.table, len(r.ranges))
if r.Success() {
g.success += len(r.ranges)
} else {
g.logger.Error(ctx, "Repair failed", "error", r.err)
g.failed += len(r.ranges)
if g.failFast {
g.stopGenerating()
if !jr.Success() {
tg.logger.Error(ctx, "Repair failed", "error", jr.err)
// All errors are logged, so in order to reduce clutter,
// return only the first one.
if tg.Err == nil {
tg.Err = jr.err
}
if tg.failFast {
tg.stopGenerating()
}
}

if percent := 100 * (g.success + g.failed) / g.count; percent > g.lastPercent {
g.logger.Info(ctx, "Progress", "percent", percent, "count", g.count, "success", g.success, "failed", g.failed)
g.lastPercent = percent
}
g.ctl.Unblock(r.replicaSet)
tg.ctl.Unblock(jr.replicaSet)
}

func (g *generator) generateJobs() {
func (tg *tableGenerator) generateJobs() {
for {
j, ok := g.newJob()
j, ok := tg.newJob()
if !ok {
return
}
g.submitter.Submit(j)
tg.submitter.Submit(j)
}
}

// newJob tries to return job passing controller restrictions.
func (g *generator) newJob() (job, bool) {
if ok := g.plan.UpdateIdx(); !ok {
g.stopGenerating()
}
if !g.shouldGenerate() {
func (tg *tableGenerator) newJob() (job, bool) {
if !tg.shouldGenerate() {
return job{}, false
}

ksIdx := g.plan.Idx
kp := g.plan.Keyspaces[ksIdx]
tabIdx := kp.Idx
tp := kp.Tables[tabIdx]

for repIdx, rep := range kp.Replicas {
if kp.IsReplicaMarked(repIdx, tabIdx) {
for _, rt := range tg.Ring.ReplicaTokens {
repHash := scyllaclient.ReplicaHash(rt.ReplicaSet)
if _, ok := tg.DoneReplicas[repHash]; ok {
continue
}

if ranges := g.ctl.TryBlock(rep.ReplicaSet); ranges > 0 {
if cnt := tg.ctl.TryBlock(rt.ReplicaSet); cnt > 0 {
ranges := tg.getRangesToRepair(rt.Ranges, cnt)
if len(ranges) == 0 {
tg.DoneReplicas[repHash] = struct{}{}
tg.ctl.Unblock(rt.ReplicaSet)
continue
}

return job{
keyspace: kp.Keyspace,
table: tp.Table,
master: g.ms.Select(rep.ReplicaSet),
replicaSet: rep.ReplicaSet,
ranges: kp.GetRangesToRepair(repIdx, tabIdx, ranges),
optimize: tp.Optimize,
deleted: tp.Deleted,
keyspace: tg.Keyspace,
table: tg.Table,
master: tg.ms.Select(rt.ReplicaSet),
replicaSet: rt.ReplicaSet,
ranges: ranges,
optimize: tg.Optimize,
deleted: tg.Deleted,
}, true
}
}

return job{}, false
}

func (g *generator) stopGenerating() {
g.stop.Store(true)
func (tg *tableGenerator) getRangesToRepair(allRanges []scyllaclient.TokenRange, cnt Intensity) []scyllaclient.TokenRange {
if tg.Optimize || tg.Deleted {
cnt = NewIntensity(len(allRanges))
}

var ranges []scyllaclient.TokenRange
for _, r := range allRanges {
if _, ok := tg.TodoRanges[r]; !ok {
continue
}
delete(tg.TodoRanges, r)
ranges = append(ranges, r)
if NewIntensity(len(ranges)) >= cnt {
break
}
}

return ranges
}

func (g *generator) shouldGenerate() bool {
return !g.stop.Load()
func (tg *tableGenerator) shouldGenerate() bool {
return !tg.stop.Load() && len(tg.TodoRanges) > 0
}

func (g *generator) shouldExit() bool {
return !g.ctl.Busy() && !g.shouldGenerate()
func (tg *tableGenerator) shouldExit() bool {
return !tg.ctl.Busy() && !tg.shouldGenerate()
}
3 changes: 0 additions & 3 deletions pkg/service/repair/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ type Target struct {
Intensity Intensity `json:"intensity"`
Parallel int `json:"parallel"`
SmallTableThreshold int64 `json:"small_table_threshold"`
// Cache for repair plan so that it does not have to be generated
// in both GetTarget and Repair functions.
plan *plan `json:"-"`
}

// taskProperties is the main data structure of the runner.Properties blob.
Expand Down
Loading

0 comments on commit e070900

Please sign in to comment.