diff --git a/pkg/scyllaclient/client_scylla.go b/pkg/scyllaclient/client_scylla.go index a172e57067..c5100b9767 100644 --- a/pkg/scyllaclient/client_scylla.go +++ b/pkg/scyllaclient/client_scylla.go @@ -20,6 +20,7 @@ import ( "github.com/pkg/errors" "github.com/scylladb/go-set/strset" "github.com/scylladb/scylla-manager/v3/pkg/dht" + "github.com/scylladb/scylla-manager/v3/pkg/util/maputil" "github.com/scylladb/scylla-manager/v3/pkg/util/slice" "go.uber.org/multierr" @@ -383,6 +384,9 @@ func (c *Client) DescribeRing(ctx context.Context, keyspace string) (Ring, error if err != nil { return Ring{}, err } + if len(resp.Payload) == 0 { + return Ring{}, errors.New("received empty token range list") + } ring := Ring{ ReplicaTokens: make([]ReplicaTokenRanges, 0), @@ -393,6 +397,9 @@ func (c *Client) DescribeRing(ctx context.Context, keyspace string) (Ring, error replicaTokens := make(map[uint64][]TokenRange) replicaHash := make(map[uint64][]string) + isNetworkTopologyStrategy := true + rf := len(resp.Payload[0].Endpoints) + var dcRF map[string]int for _, p := range resp.Payload { // Parse tokens startToken, err := strconv.ParseInt(p.StartToken, 10, 64) @@ -415,6 +422,21 @@ func (c *Client) DescribeRing(ctx context.Context, keyspace string) (Ring, error EndToken: endToken, }) + // Update replication factors + if rf != len(p.Endpoints) { + return Ring{}, errors.Errorf("ifferent token ranges have different rf (%d/%d). Repair is not safe for now", rf, len(p.Endpoints)) + } + tokenDCrf := make(map[string]int) + for _, e := range p.EndpointDetails { + tokenDCrf[e.Datacenter]++ + } + // NetworkTopologyStrategy -> all token ranges have the same dc to rf mapping + if dcRF == nil || maputil.Equal(dcRF, tokenDCrf) { + dcRF = tokenDCrf + } else { + isNetworkTopologyStrategy = false + } + // Update host to DC mapping for _, e := range p.EndpointDetails { ring.HostDC[e.Host] = e.Datacenter @@ -443,16 +465,15 @@ func (c *Client) DescribeRing(ctx context.Context, keyspace string) (Ring, error } // Detect replication strategy - if len(ring.HostDC) == 1 { + ring.RF = rf + switch { + case len(ring.HostDC) == 1: ring.Replication = LocalStrategy - } else { + case isNetworkTopologyStrategy: ring.Replication = NetworkTopologyStrategy - for _, tokens := range dcTokens { - if tokens != len(resp.Payload) { - ring.Replication = SimpleStrategy - break - } - } + ring.DCrf = dcRF + default: + ring.Replication = SimpleStrategy } return ring, nil @@ -854,8 +875,14 @@ func (t HostKeyspaceTables) Hosts() []string { return s.List() } +// SizeReport extends HostKeyspaceTable with Size information. +type SizeReport struct { + HostKeyspaceTable + Size int64 +} + // TableDiskSizeReport returns total on disk size of tables in bytes. -func (c *Client) TableDiskSizeReport(ctx context.Context, hostKeyspaceTables HostKeyspaceTables) ([]int64, error) { +func (c *Client) TableDiskSizeReport(ctx context.Context, hostKeyspaceTables HostKeyspaceTables) ([]SizeReport, error) { // Get shard count of a first node to estimate parallelism limit shards, err := c.ShardCount(ctx, "") if err != nil { @@ -864,7 +891,7 @@ func (c *Client) TableDiskSizeReport(ctx context.Context, hostKeyspaceTables Hos var ( limit = len(hostKeyspaceTables.Hosts()) * int(shards) - report = make([]int64, len(hostKeyspaceTables)) + report = make([]SizeReport, len(hostKeyspaceTables)) ) f := func(i int) error { @@ -881,7 +908,10 @@ func (c *Client) TableDiskSizeReport(ctx context.Context, hostKeyspaceTables Hos "size", size, ) - report[i] = size + report[i] = SizeReport{ + HostKeyspaceTable: v, + Size: size, + } return nil } diff --git a/pkg/scyllaclient/client_scylla_integration_test.go b/pkg/scyllaclient/client_scylla_integration_test.go index 80b6ad5d53..cd8752ab21 100644 --- a/pkg/scyllaclient/client_scylla_integration_test.go +++ b/pkg/scyllaclient/client_scylla_integration_test.go @@ -13,7 +13,9 @@ import ( "testing" "time" + "github.com/scylladb/scylla-manager/v3/pkg/testutils/db" . "github.com/scylladb/scylla-manager/v3/pkg/testutils/testconfig" + "github.com/scylladb/scylla-manager/v3/pkg/util/maputil" "github.com/google/go-cmp/cmp" "github.com/scylladb/go-log" @@ -71,6 +73,77 @@ func TestClientStatusIntegration(t *testing.T) { } } +func TestClientDescribeRingIntegration(t *testing.T) { + testCases := []struct { + replicationStmt string + replication scyllaclient.ReplicationStrategy + rf int + dcRF map[string]int + }{ + { + replicationStmt: "{'class': 'SimpleStrategy', 'replication_factor': 4}", + replication: scyllaclient.SimpleStrategy, + rf: 4, + }, + { + replicationStmt: "{'class': 'NetworkTopologyStrategy', 'dc1': 1}", + replication: scyllaclient.NetworkTopologyStrategy, + rf: 1, + dcRF: map[string]int{ + "dc1": 1, + }, + }, + { + replicationStmt: "{'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}", + replication: scyllaclient.NetworkTopologyStrategy, + rf: 4, + dcRF: map[string]int{ + "dc1": 2, + "dc2": 2, + }, + }, + { + replicationStmt: "{'class': 'NetworkTopologyStrategy', 'dc1': 3, 'dc2': 3}", + replication: scyllaclient.NetworkTopologyStrategy, + rf: 6, + dcRF: map[string]int{ + "dc1": 3, + "dc2": 3, + }, + }, + } + + client, err := scyllaclient.NewClient(scyllaclient.TestConfig(ManagedClusterHosts(), AgentAuthToken()), log.NewDevelopment()) + if err != nil { + t.Fatal(err) + } + clusterSession := db.CreateSessionAndDropAllKeyspaces(t, client) + defer clusterSession.Close() + + for i := range testCases { + tc := testCases[i] + if err := clusterSession.ExecStmt("DROP KEYSPACE IF EXISTS test_ks"); err != nil { + t.Fatal(err) + } + if err := clusterSession.ExecStmt("CREATE KEYSPACE test_ks WITH replication = " + tc.replicationStmt); err != nil { + t.Fatal(err) + } + ring, err := client.DescribeRing(context.Background(), "test_ks") + if err != nil { + t.Fatal(err) + } + if tc.replication != ring.Replication { + t.Fatalf("Replication: expected %s, got %s", tc.replication, ring.Replication) + } + if tc.rf != ring.RF { + t.Fatalf("RF: expected %d, got %d", tc.rf, ring.RF) + } + if !maputil.Equal(tc.dcRF, ring.DCrf) { + t.Fatalf("DCrf: expected %v, got %v", tc.dcRF, ring.DCrf) + } + } +} + func TestClientActiveRepairsIntegration(t *testing.T) { client, err := scyllaclient.NewClient(scyllaclient.TestConfig(ManagedClusterHosts(), AgentAuthToken()), log.NewDevelopment()) if err != nil { diff --git a/pkg/scyllaclient/model.go b/pkg/scyllaclient/model.go index 3dc655e783..7140e4dc11 100644 --- a/pkg/scyllaclient/model.go +++ b/pkg/scyllaclient/model.go @@ -179,6 +179,8 @@ type Ring struct { ReplicaTokens []ReplicaTokenRanges HostDC map[string]string Replication ReplicationStrategy + RF int + DCrf map[string]int // initialized only for NetworkTopologyStrategy } // Datacenters returns a list of datacenters the keyspace is replicated in. diff --git a/pkg/service/backup/service.go b/pkg/service/backup/service.go index 481986e56c..7d10c93906 100644 --- a/pkg/service/backup/service.go +++ b/pkg/service/backup/service.go @@ -409,8 +409,8 @@ func (s *Service) GetTargetSize(ctx context.Context, clusterID uuid.UUID, target } var total int64 - for _, size := range report { - total += size + for _, sr := range report { + total += sr.Size } return total, err diff --git a/pkg/service/repair/generator.go b/pkg/service/repair/generator.go index dd21813cc4..ab5dd3d3fa 100644 --- a/pkg/service/repair/generator.go +++ b/pkg/service/repair/generator.go @@ -4,55 +4,47 @@ package repair import ( "context" - "math" - "sort" - "strings" "sync/atomic" "github.com/pkg/errors" "github.com/scylladb/go-log" - "github.com/scylladb/go-set/strset" "github.com/scylladb/scylla-manager/v3/pkg/dht" "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" - "github.com/scylladb/scylla-manager/v3/pkg/util/slice" ) -// masterSelector describes each host priority for being repair master. -// Repair master is first chosen by smallest shard count, -// then by smallest dc RTT from SM. -type masterSelector map[string]int - -func newMasterSelector(shards map[string]uint, hostDC map[string]string, closestDC []string) masterSelector { - hosts := make([]string, 0, len(shards)) - for h := range shards { - hosts = append(hosts, h) - } +// generator is responsible for creating and orchestrating tableGenerators. +type generator struct { + generatorTools - sort.Slice(hosts, func(i, j int) bool { - if shards[hosts[i]] != shards[hosts[j]] { - return shards[hosts[i]] < shards[hosts[j]] - } - return slice.Index(closestDC, hostDC[hosts[i]]) < slice.Index(closestDC, hostDC[hosts[j]]) - }) + target Target + plan *plan + pm ProgressManager + client *scyllaclient.Client +} - ms := make(masterSelector) - for i, h := range hosts { - ms[h] = i - } - return ms +// tableGenerator is responsible for generating and orchestrating +// repair jobs of given table. +type tableGenerator struct { + generatorTools + + Keyspace string + Table string + Ring scyllaclient.Ring + TodoRanges map[scyllaclient.TokenRange]struct{} + DoneReplicas map[uint64]struct{} + Optimize bool + Deleted bool + Err error } -// Select returns repair master from replica set. -func (ms masterSelector) Select(replicas []string) string { - var master string - p := math.MaxInt64 - for _, r := range replicas { - if ms[r] < p { - p = ms[r] - master = r - } - } - return master +// Tools shared between generator and tableGenerator. +type generatorTools struct { + target Target + ctl controller + ms masterSelector + submitter submitter[job, jobResult] + stop *atomic.Bool + logger log.Logger } type submitter[T, R any] interface { @@ -73,202 +65,173 @@ type job struct { deleted bool } -// tryOptimizeRanges returns either predefined ranges -// or one full token range for small fully replicated tables. -func (j job) tryOptimizeRanges() []scyllaclient.TokenRange { - if j.optimize { - return []scyllaclient.TokenRange{ - { - StartToken: dht.Murmur3MinToken, - EndToken: dht.Murmur3MaxToken, - }, - } - } - return j.ranges -} - type jobResult struct { job err error } -func (r jobResult) Success() bool { - // jobs of deleted tables are considered to by successful - return r.err == nil || errors.Is(r.err, errTableDeleted) -} - -type generator struct { - plan *plan - ctl controller - ms masterSelector - client *scyllaclient.Client - - logger log.Logger - failFast bool - - // Responsible for submitting jobs and receiving results. - submitter submitter[job, jobResult] - // Determines if generator should keep on generating new jobs. - stop atomic.Bool - - // Statistics for logging purposes. - count int - success int - failed int - lastPercent int -} - -func newGenerator(ctx context.Context, target Target, client *scyllaclient.Client, - i intensityChecker, s submitter[job, jobResult], logger log.Logger, +func newGenerator(ctx context.Context, target Target, client *scyllaclient.Client, i intensityChecker, + s submitter[job, jobResult], plan *plan, pm ProgressManager, logger log.Logger, ) (*generator, error) { - var ord, cnt int - for _, kp := range target.plan.Keyspaces { - for _, tp := range kp.Tables { - cnt += len(kp.TokenRepIdx) - ord++ - logger.Info(ctx, "Repair order", - "order", ord, - "keyspace", kp.Keyspace, - "table", tp.Table, - "size", tp.Size, - "merge_ranges", tp.Optimize, - ) - } - } - - hosts := target.plan.Hosts() status, err := client.Status(ctx) if err != nil { return nil, errors.Wrap(err, "get status") } - if down := strset.New(status.Down().Hosts()...); down.HasAny(hosts...) { - return nil, errors.Errorf("ensure nodes are up, down nodes: %s", strings.Join(down.List(), ",")) - } closestDC, err := client.ClosestDC(ctx, status.DatacenterMap(target.DC)) if err != nil { return nil, errors.Wrap(err, "calculate closest dc") } - shards, err := client.HostsShardCount(ctx, hosts) + shards, err := client.HostsShardCount(ctx, plan.Hosts) if err != nil { return nil, err } return &generator{ - plan: target.plan, - ctl: newRowLevelRepairController(i), - ms: newMasterSelector(shards, status.HostDC(), closestDC), - client: client, - logger: logger, - failFast: target.FailFast, - submitter: s, - count: cnt, - lastPercent: -1, + generatorTools: generatorTools{ + target: target, + ctl: newRowLevelRepairController(i), + ms: newMasterSelector(shards, status.HostDC(), closestDC), + submitter: s, + stop: &atomic.Bool{}, + logger: logger, + }, + target: target, + plan: plan, + pm: pm, + client: client, }, nil } func (g *generator) Run(ctx context.Context) error { g.logger.Info(ctx, "Start generator") - running := true + var genErr error - g.generateJobs() - if g.shouldExit() { - running = false - } - - for running { - if ctx.Err() != nil { + for _, ksp := range g.plan.Keyspaces { + if !g.shouldGenerate() { break } - select { - case <-ctx.Done(): - running = false - case r := <-g.submitter.Results(): - g.processResult(ctx, r) - g.generateJobs() - if g.shouldExit() { - running = false + + ring, err := g.client.DescribeRing(ctx, ksp.Keyspace) + if err != nil { + return errors.Wrap(err, "describe ring") + } + + for _, tp := range ksp.Tables { + if !g.shouldGenerate() { + break + } + + tg := g.newTableGenerator(ksp.Keyspace, tp, ring) + // All errors are logged, so in order to reduce clutter, + // return only the first one. + if err := tg.Run(ctx); err != nil && genErr == nil { + genErr = err } } } g.logger.Info(ctx, "Close generator") g.submitter.Close() // Free workers waiting on next - - // Don't return ctx error as graceful ctx is handled from service level - if g.failed > 0 { - return errors.Errorf("repair %d token ranges out of %d", g.failed, g.count) - } - return nil + return errors.Wrap(genErr, "see more errors in logs") } -func (g *generator) processResult(ctx context.Context, r jobResult) { - // Don't record context errors - if errors.Is(r.err, context.Canceled) { - return +func (g *generator) newTableGenerator(keyspace string, tp tablePlan, ring scyllaclient.Ring) *tableGenerator { + todoRanges := make(map[scyllaclient.TokenRange]struct{}) + for _, rt := range ring.ReplicaTokens { + for _, r := range rt.Ranges { + todoRanges[r] = struct{}{} + } } - - if r.err != nil && errors.Is(r.err, errTableDeleted) { - g.logger.Info(ctx, "Detected table deletion", "keyspace", r.keyspace, "table", r.table) - g.plan.MarkDeleted(r.keyspace, r.table) + done := g.pm.GetCompletedRanges(keyspace, tp.Table) + for _, r := range done { + delete(todoRanges, r) } - g.plan.MarkDoneRanges(r.keyspace, r.table, len(r.ranges)) - if r.Success() { - g.success += len(r.ranges) - } else { - g.logger.Error(ctx, "Repair failed", "error", r.err) - g.failed += len(r.ranges) - if g.failFast { - g.stopGenerating() - } + tg := &tableGenerator{ + generatorTools: g.generatorTools, + Keyspace: keyspace, + Table: tp.Table, + Ring: ring, + TodoRanges: todoRanges, + DoneReplicas: make(map[uint64]struct{}), + Optimize: tp.Optimize, } + tg.logger = tg.logger.Named(keyspace + "." + tp.Table) + return tg +} + +func (g *generator) shouldGenerate() bool { + return !g.stop.Load() +} - if percent := 100 * (g.success + g.failed) / g.count; percent > g.lastPercent { - g.logger.Info(ctx, "Progress", "percent", percent, "count", g.count, "success", g.success, "failed", g.failed) - g.lastPercent = percent +func (tg *tableGenerator) Run(ctx context.Context) error { + tg.logger.Info(ctx, "Start table generator") + tg.generateJobs() + for !tg.shouldExit() { + if ctx.Err() != nil { + break + } + select { + case <-ctx.Done(): + case r := <-tg.submitter.Results(): + tg.processResult(ctx, r) + tg.generateJobs() + } } - g.ctl.Unblock(r.replicaSet) + tg.logger.Info(ctx, "Close table generator") + return tg.Err } -func (g *generator) generateJobs() { +func (tg *tableGenerator) generateJobs() { for { - j, ok := g.newJob() + j, ok := tg.newJob() if !ok { return } - g.submitter.Submit(j) + tg.submitter.Submit(j) } } // newJob tries to return job passing controller restrictions. -func (g *generator) newJob() (job, bool) { - if ok := g.plan.UpdateIdx(); !ok { - g.stopGenerating() - } - if !g.shouldGenerate() { +func (tg *tableGenerator) newJob() (job, bool) { + if !tg.shouldGenerate() { return job{}, false } - ksIdx := g.plan.Idx - kp := g.plan.Keyspaces[ksIdx] - tabIdx := kp.Idx - tp := kp.Tables[tabIdx] + for _, rt := range tg.Ring.ReplicaTokens { + // Calculate replica hash on not filtered replica set + // because different replica sets might be the same after filtering. + repHash := scyllaclient.ReplicaHash(rt.ReplicaSet) + if _, ok := tg.DoneReplicas[repHash]; ok { + continue + } - for repIdx, rep := range kp.Replicas { - if kp.IsReplicaMarked(repIdx, tabIdx) { + filtered := filterReplicaSet(rt.ReplicaSet, tg.Ring.HostDC, tg.target) + if len(filtered) == 0 { + tg.DoneReplicas[repHash] = struct{}{} + for _, r := range rt.Ranges { + delete(tg.TodoRanges, r) + } continue } - if ranges := g.ctl.TryBlock(rep.ReplicaSet); ranges > 0 { + if cnt := tg.ctl.TryBlock(filtered); cnt > 0 { + ranges := tg.getRangesToRepair(rt.Ranges, cnt) + if len(ranges) == 0 { + tg.DoneReplicas[repHash] = struct{}{} + tg.ctl.Unblock(filtered) + continue + } + return job{ - keyspace: kp.Keyspace, - table: tp.Table, - master: g.ms.Select(rep.ReplicaSet), - replicaSet: rep.ReplicaSet, - ranges: kp.GetRangesToRepair(repIdx, tabIdx, ranges), - optimize: tp.Optimize, - deleted: tp.Deleted, + keyspace: tg.Keyspace, + table: tg.Table, + master: tg.ms.Select(filtered), + replicaSet: filtered, + ranges: ranges, + optimize: tg.Optimize, + deleted: tg.Deleted, }, true } } @@ -276,14 +239,78 @@ func (g *generator) newJob() (job, bool) { return job{}, false } -func (g *generator) stopGenerating() { - g.stop.Store(true) +func (tg *tableGenerator) getRangesToRepair(allRanges []scyllaclient.TokenRange, cnt Intensity) []scyllaclient.TokenRange { + if tg.Optimize || tg.Deleted { + cnt = NewIntensity(len(allRanges)) + } + + var ranges []scyllaclient.TokenRange + for _, r := range allRanges { + if _, ok := tg.TodoRanges[r]; !ok { + continue + } + delete(tg.TodoRanges, r) + ranges = append(ranges, r) + if NewIntensity(len(ranges)) >= cnt { + break + } + } + + return ranges } -func (g *generator) shouldGenerate() bool { - return !g.stop.Load() +func (tg *tableGenerator) processResult(ctx context.Context, jr jobResult) { + // Don't record context errors + if errors.Is(jr.err, context.Canceled) { + return + } + + if jr.err != nil && errors.Is(jr.err, errTableDeleted) { + tg.logger.Info(ctx, "Detected table deletion", "keyspace", jr.keyspace, "table", jr.table) + tg.Deleted = true + } + + if !jr.Success() { + tg.logger.Error(ctx, "Repair failed", "error", jr.err) + // All errors are logged, so in order to reduce clutter, + // return only the first one. + if tg.Err == nil { + tg.Err = jr.err + } + if tg.target.FailFast { + tg.stopGenerating() + } + } + tg.ctl.Unblock(jr.replicaSet) +} + +func (gt generatorTools) stopGenerating() { + gt.stop.Store(true) } -func (g *generator) shouldExit() bool { - return !g.ctl.Busy() && !g.shouldGenerate() +func (tg *tableGenerator) shouldGenerate() bool { + return !tg.stop.Load() && len(tg.TodoRanges) > 0 +} + +func (tg *tableGenerator) shouldExit() bool { + return !tg.ctl.Busy() && !tg.shouldGenerate() +} + +// tryOptimizeRanges returns either predefined ranges +// or one full token range for small fully replicated tables. +func (j job) tryOptimizeRanges() []scyllaclient.TokenRange { + if j.optimize { + return []scyllaclient.TokenRange{ + { + StartToken: dht.Murmur3MinToken, + EndToken: dht.Murmur3MaxToken, + }, + } + } + return j.ranges +} + +func (r jobResult) Success() bool { + // jobs of deleted tables are considered to by successful + return r.err == nil || errors.Is(r.err, errTableDeleted) } diff --git a/pkg/service/repair/master_selector.go b/pkg/service/repair/master_selector.go new file mode 100644 index 0000000000..266aab0120 --- /dev/null +++ b/pkg/service/repair/master_selector.go @@ -0,0 +1,48 @@ +// Copyright (C) 2024 ScyllaDB + +package repair + +import ( + "math" + "sort" + + "github.com/scylladb/scylla-manager/v3/pkg/util/slice" +) + +// masterSelector describes each host priority for being repair master. +// Repair master is first chosen by smallest shard count, +// then by smallest dc RTT from SM. +type masterSelector map[string]int + +func newMasterSelector(shards map[string]uint, hostDC map[string]string, closestDC []string) masterSelector { + hosts := make([]string, 0, len(shards)) + for h := range shards { + hosts = append(hosts, h) + } + + sort.Slice(hosts, func(i, j int) bool { + if shards[hosts[i]] != shards[hosts[j]] { + return shards[hosts[i]] < shards[hosts[j]] + } + return slice.Index(closestDC, hostDC[hosts[i]]) < slice.Index(closestDC, hostDC[hosts[j]]) + }) + + ms := make(masterSelector) + for i, h := range hosts { + ms[h] = i + } + return ms +} + +// Select returns repair master from replica set. +func (ms masterSelector) Select(replicas []string) string { + var master string + p := math.MaxInt64 + for _, r := range replicas { + if ms[r] < p { + p = ms[r] + master = r + } + } + return master +} diff --git a/pkg/service/repair/model.go b/pkg/service/repair/model.go index 6872a3f98b..0a5dc896a9 100644 --- a/pkg/service/repair/model.go +++ b/pkg/service/repair/model.go @@ -40,9 +40,6 @@ type Target struct { Intensity Intensity `json:"intensity"` Parallel int `json:"parallel"` SmallTableThreshold int64 `json:"small_table_threshold"` - // Cache for repair plan so that it does not have to be generated - // in both GetTarget and Repair functions. - plan *plan `json:"-"` } // taskProperties is the main data structure of the runner.Properties blob. diff --git a/pkg/service/repair/plan.go b/pkg/service/repair/plan.go index cd15c8d933..b159163eb3 100644 --- a/pkg/service/repair/plan.go +++ b/pkg/service/repair/plan.go @@ -15,13 +15,38 @@ import ( // plan describes whole repair schedule and state. type plan struct { - Keyspaces []keyspacePlan - Idx int // Idx of currently repaired keyspace + Keyspaces keyspacePlans - SkippedKeyspaces []string + Hosts []string MaxParallel int MaxHostIntensity map[string]Intensity - HostTableSize map[scyllaclient.HostKeyspaceTable]int64 + // Used for progress purposes + Stats map[scyllaclient.HostKeyspaceTable]tableStats +} + +type keyspacePlans []keyspacePlan + +// keyspacePlan describes repair schedule and state for keyspace. +type keyspacePlan struct { + Keyspace string + Size int64 + Tables []tablePlan +} + +// tablePlan describes repair schedule and state for table. +type tablePlan struct { + Table string + Size int64 + RangesCnt int + ReplicaSetCnt int + // Optimized tables (small and fully replicated) + // have all ranges for replica set repaired in a single job. + Optimize bool +} + +type tableStats struct { + Size int64 + Ranges int } func newPlan(ctx context.Context, target Target, client *scyllaclient.Client) (*plan, error) { @@ -29,221 +54,89 @@ func newPlan(ctx context.Context, target Target, client *scyllaclient.Client) (* if err != nil { return nil, errors.Wrap(err, "get status") } - filtered := filteredHosts(target, status) + status.HostDC() + + var ( + ks keyspacePlans + ranges = make(map[scyllaclient.HostKeyspaceTable]int) + allHosts = strset.New() + maxP int + ) - p := new(plan) for _, u := range target.Units { ring, err := client.DescribeRing(ctx, u.Keyspace) if err != nil { return nil, errors.Wrapf(err, "keyspace %s: get ring description", u.Keyspace) } - // Allow repairing single node cluster for better UX and tests - if ring.Replication == scyllaclient.LocalStrategy && len(status) > 1 { + // Allow repairing single node cluster for better UX. + if len(status) > 1 && !ShouldRepairRing(ring, target.DC, target.Host) { continue } - kp := keyspacePlan{ - Keyspace: u.Keyspace, - TokenRepIdx: make(map[scyllaclient.TokenRange]int), - AllTables: u.AllTables, - } + // Update max parallel + maxP = max(maxP, MaxRingParallel(ring, target.DC)) - skip := false - for _, rtr := range ring.ReplicaTokens { - // Skip the whole keyspace based on repaired dcs only - // (unless it's a single node cluster). - replicas := 0 - for _, h := range rtr.ReplicaSet { - if slice.ContainsString(target.DC, ring.HostDC[h]) { - replicas++ - } - } - if replicas <= 1 && len(status) > 1 { - skip = true - break - } - // Skip given replica sets based on all filtering factors - rtr.ReplicaSet = filteredReplicaSet(rtr.ReplicaSet, filtered, target.Host) - if len(rtr.ReplicaSet) <= 1 && len(status) > 1 { + // Update ranges and hosts + rangesCnt := 0 + replicaSetCnt := 0 + for _, rep := range ring.ReplicaTokens { + filtered := filterReplicaSet(rep.ReplicaSet, ring.HostDC, target) + if len(filtered) == 0 { continue } - for _, r := range rtr.Ranges { - kp.TokenRepIdx[r] = len(kp.Replicas) - } - kp.Replicas = append(kp.Replicas, rtr) - } + replicaSetCnt++ + allHosts.Add(filtered...) - if skip || len(kp.Replicas) == 0 { - p.SkippedKeyspaces = append(p.SkippedKeyspaces, u.Keyspace) - continue + for _, h := range filtered { + for _, t := range u.Tables { + ranges[newHostKsTable(h, u.Keyspace, t)] += len(rep.Ranges) + } + } + rangesCnt += len(rep.Ranges) } - // Fill tables + // Update table plan + var tables []tablePlan for _, t := range u.Tables { - kp.Tables = append(kp.Tables, tablePlan{ - Table: t, - MarkedRanges: make(map[scyllaclient.TokenRange]struct{}), - MarkedInReplica: make([]int, len(kp.Replicas)), + tables = append(tables, tablePlan{ + Table: t, + ReplicaSetCnt: replicaSetCnt, + RangesCnt: rangesCnt, }) } - p.Keyspaces = append(p.Keyspaces, kp) - } - - if len(p.Keyspaces) == 0 { - return nil, ErrEmptyRepair - } - if err := p.FillSize(ctx, client, target.SmallTableThreshold); err != nil { - return nil, errors.Wrap(err, "calculate tables size") - } - return p, nil -} -// UpdateIdx sets keyspace and table idx to the next not repaired table. -// Returns false if there are no more tables to repair. -func (p *plan) UpdateIdx() bool { - ksIdx := p.Idx - tabIdx := p.Keyspaces[ksIdx].Idx - - for ; ksIdx < len(p.Keyspaces); ksIdx++ { - kp := p.Keyspaces[ksIdx] - for ; tabIdx < len(kp.Tables); tabIdx++ { - // Always wait for current table to be fully repaired before moving to the next one - if !kp.IsTableRepaired(tabIdx) { - p.Idx = ksIdx - p.Keyspaces[ksIdx].Idx = tabIdx - return true - } - } - tabIdx = 0 - } - - return false -} - -// Hosts returns all hosts taking part in repair. -func (p *plan) Hosts() []string { - out := strset.New() - for _, kp := range p.Keyspaces { - out.Add(kp.Hosts()...) - } - return out.List() -} - -// Units returns repaired tables in unit format. -func (p *plan) Units() []Unit { - var out []Unit - for _, kp := range p.Keyspaces { - u := Unit{ - Keyspace: kp.Keyspace, - AllTables: kp.AllTables, - } - for _, tp := range kp.Tables { - u.Tables = append(u.Tables, tp.Table) - } - out = append(out, u) + ks = append(ks, keyspacePlan{ + Keyspace: u.Keyspace, + Tables: tables, + }) } - return out -} -// SetMaxParallel sets maximal repair parallelism. -func (p *plan) SetMaxParallel(dcMap map[string][]string) { - var max int - for _, kp := range p.Keyspaces { - // Max parallel is equal to the greatest max keyspace parallel - if cand := kp.maxParallel(dcMap); max < cand { - max = cand - } + if len(ks) == 0 { + return nil, ErrEmptyRepair } - p.MaxParallel = max -} -// SetMaxHostIntensity sets max_ranges_in_parallel for all repaired host. -func (p *plan) SetMaxHostIntensity(ctx context.Context, client *scyllaclient.Client) error { - hosts := p.Hosts() - shards, err := client.HostsShardCount(ctx, hosts) - if err != nil { - return err - } - memory, err := client.HostsTotalMemory(ctx, hosts) + // Update size and optimize + hosts := allHosts.List() + sizeReport, err := ks.fillSize(ctx, client, hosts) if err != nil { - return err + return nil, err } + ks.fillOptimize(target.SmallTableThreshold) - p.MaxHostIntensity = hostMaxRanges(shards, memory) - return nil -} - -func (p *plan) MarkDeleted(keyspace, table string) { - for _, kp := range p.Keyspaces { - if kp.Keyspace != keyspace { - continue - } - for tabIdx, tp := range kp.Tables { - if tp.Table == table { - kp.Tables[tabIdx].Deleted = true - return - } - } - } -} - -func (p *plan) MarkDoneRanges(keyspace, table string, cnt int) { - for _, kp := range p.Keyspaces { - if kp.Keyspace != keyspace { - continue - } - for tabIdx, tp := range kp.Tables { - if tp.Table == table { - kp.Tables[tabIdx].Done += cnt - return - } - } - } -} - -// FillSize sets size and optimize of each table. -func (p *plan) FillSize(ctx context.Context, client *scyllaclient.Client, smallTableThreshold int64) error { - var hkts []scyllaclient.HostKeyspaceTable - hosts := p.Hosts() - for _, kp := range p.Keyspaces { - for _, tp := range kp.Tables { - for _, h := range hosts { - hkts = append(hkts, scyllaclient.HostKeyspaceTable{Host: h, Keyspace: kp.Keyspace, Table: tp.Table}) - } - } - } - - report, err := client.TableDiskSizeReport(ctx, hkts) + // Update max host intensity + mhi, err := maxHostIntensity(ctx, client, hosts) if err != nil { - return errors.Wrap(err, "fetch table disk size report") + return nil, errors.Wrap(err, "calculate max host intensity") } - ksSize := make(map[string]int64) - tableSize := make(map[string]int64) - p.HostTableSize = make(map[scyllaclient.HostKeyspaceTable]int64, len(hkts)) - for i, size := range report { - ksSize[hkts[i].Keyspace] += size - tableSize[hkts[i].Keyspace+"."+hkts[i].Table] += size - p.HostTableSize[scyllaclient.HostKeyspaceTable{ - Host: hkts[i].Host, - Keyspace: hkts[i].Keyspace, - Table: hkts[i].Table, - }] = size - } - - for i, kp := range p.Keyspaces { - p.Keyspaces[i].Size = ksSize[kp.Keyspace] - for j := range kp.Tables { - kp.Tables[j].Size = tableSize[kp.Keyspace+"."+kp.Tables[j].Table] - // Return merged ranges for small, fully replicated table (#3128) - if kp.Tables[j].Size < smallTableThreshold && len(kp.Replicas) == 1 { - kp.Tables[j].Optimize = true - } - } - } - - return nil + return &plan{ + Keyspaces: ks, + Hosts: hosts, + MaxParallel: maxP, + MaxHostIntensity: mhi, + Stats: newStats(sizeReport, ranges), + }, nil } // ViewSort ensures that views are repaired after base tables. @@ -281,207 +174,200 @@ func (p *plan) SizeSort() { } } -// TableSizeMap returns recorded size of repaired tables. -func (p *plan) TableSizeMap() map[string]int64 { - out := make(map[string]int64) - for _, kp := range p.Keyspaces { - for _, tp := range kp.Tables { - out[kp.Keyspace+"."+tp.Table] = tp.Size +// FilteredUnits returns repaired tables in unit format. +func (p *plan) FilteredUnits(units []Unit) []Unit { + allTables := make(map[string]int) + for _, u := range units { + if u.AllTables { + allTables[u.Keyspace] = len(u.Tables) + } else { + allTables[u.Keyspace] = -1 } } - return out -} - -// KeyspaceRangesMap returns ranges count of repaired keyspaces. -// All tables in the same keyspace have the same ranges count. -func (p *plan) KeyspaceRangesMap() map[string]int64 { - out := make(map[string]int64) + var out []Unit for _, kp := range p.Keyspaces { - out[kp.Keyspace] = int64(len(kp.TokenRepIdx)) + u := Unit{ + Keyspace: kp.Keyspace, + AllTables: allTables[kp.Keyspace] == len(kp.Tables), + } + for _, tp := range kp.Tables { + u.Tables = append(u.Tables, tp.Table) + } + out = append(out, u) } return out } -func hostMaxRanges(shards map[string]uint, memory map[string]int64) map[string]Intensity { - out := make(map[string]Intensity, len(shards)) - for h, sh := range shards { - out[h] = maxRepairRangesInParallel(sh, memory[h]) +func (p keyspacePlans) fillSize(ctx context.Context, client *scyllaclient.Client, hosts []string) ([]scyllaclient.SizeReport, error) { + var hkts []scyllaclient.HostKeyspaceTable + for _, ksp := range p { + for _, tp := range ksp.Tables { + for _, h := range hosts { + hkts = append(hkts, newHostKsTable(h, ksp.Keyspace, tp.Table)) + } + } } - return out -} -func maxRepairRangesInParallel(shards uint, totalMemory int64) Intensity { - const MiB = 1024 * 1024 - memoryPerShard := totalMemory / int64(shards) - max := int(0.1 * float64(memoryPerShard) / (32 * MiB) / 4) - if max == 0 { - max = 1 + sizeReport, err := client.TableDiskSizeReport(ctx, hkts) + if err != nil { + return nil, errors.Wrap(err, "calculate tables size") } - return NewIntensity(max) -} -// keyspacePlan describes repair schedule and state for keyspace. -type keyspacePlan struct { - Keyspace string - Size int64 - - // All tables in the same keyspace share the same replicas and ranges - Tables []tablePlan - // Idx of currently repaired table - Idx int - AllTables bool - - Replicas []scyllaclient.ReplicaTokenRanges - // Maps token range to replica set (by index) that owns it. - // Contains all token ranges as entries. - TokenRepIdx map[scyllaclient.TokenRange]int -} - -func (kp keyspacePlan) IsReplicaMarked(repIdx, tabIdx int) bool { - return len(kp.Replicas[repIdx].Ranges) == kp.Tables[tabIdx].MarkedInReplica[repIdx] -} - -func (kp keyspacePlan) IsTableRepaired(tabIdx int) bool { - return len(kp.TokenRepIdx) == kp.Tables[tabIdx].Done -} - -// GetRangesToRepair returns at most cnt ranges of table owned by replica set. -func (kp keyspacePlan) GetRangesToRepair(repIdx, tabIdx int, intensity Intensity) []scyllaclient.TokenRange { - rep := kp.Replicas[repIdx] - tp := kp.Tables[tabIdx] - - // Return all ranges for optimized or deleted table - if tp.Optimize || tp.Deleted { - intensity = NewIntensity(len(rep.Ranges)) + ksSize := make(map[string]int64) + tableSize := make(map[string]int64) + for _, sr := range sizeReport { + ksSize[sr.Keyspace] += sr.Size + tableSize[sr.Keyspace+"."+sr.Table] += sr.Size } - var out []scyllaclient.TokenRange - for _, r := range rep.Ranges { - if tp.MarkRange(repIdx, r) { - out = append(out, r) - if NewIntensity(len(out)) >= intensity { - break - } + for i, ksp := range p { + p[i].Size = ksSize[ksp.Keyspace] + for j, tp := range ksp.Tables { + ksp.Tables[j].Size = tableSize[ksp.Keyspace+"."+tp.Table] } } - - return out + return sizeReport, nil } -// maxParallel returns maximal repair parallelism limited to keyspace. -func (kp keyspacePlan) maxParallel(dcMap map[string][]string) int { - min := math.MaxInt - for _, dcHosts := range dcMap { - // Max keyspace parallel is equal to the smallest max DC parallel - if cand := kp.maxDCParallel(dcHosts); cand < min { - min = cand +func (p keyspacePlans) fillOptimize(smallTableThreshold int64) { + for _, ksp := range p { + for j, tp := range ksp.Tables { + // Return merged ranges for small, fully replicated table (#3128) + if tp.Size < smallTableThreshold && tp.ReplicaSetCnt == 1 { + ksp.Tables[j].Optimize = true + } } } - return min } -// maxDCParallel returns maximal repair parallelism limited to keyspace and nodes of given dc. -func (kp keyspacePlan) maxDCParallel(dcHosts []string) int { - var max int - filteredDCHosts := setIntersection(strset.New(dcHosts...), strset.New(kp.Hosts()...)) - // Not repaired DC does not have any limits on parallel - if filteredDCHosts.Size() == 0 { - return math.MaxInt +// ShouldRepairRing when all ranges are replicated (len(replicaSet) > 1) in specified dcs. +// If host is set, it also checks if host belongs to the dcs. +func ShouldRepairRing(ring scyllaclient.Ring, dcs []string, host string) bool { + repairedDCs := strset.New(dcs...) + if host != "" { + if dc, ok := ring.HostDC[host]; !ok || !repairedDCs.Has(dc) { + return false + } } - for _, rep := range kp.Replicas { - filteredRepSet := setIntersection(strset.New(rep.ReplicaSet...), filteredDCHosts) - if filteredRepSet.Size() == 0 { - continue + switch ring.Replication { + case scyllaclient.SimpleStrategy: + // Check range consisting of excluded hosts + excluded := 0 + for _, dc := range ring.HostDC { + if !repairedDCs.Has(dc) { + excluded++ + } } - // Max DC parallel is equal to #(repaired nodes from DC) / #(smallest partial replica set from DC) - if cand := filteredDCHosts.Size() / filteredRepSet.Size(); max < cand { - max = cand + return ring.RF > excluded+1 + case scyllaclient.NetworkTopologyStrategy: + rep := 0 + for dc, rf := range ring.DCrf { + if repairedDCs.Has(dc) { + rep += rf + } } + return rep > 1 + default: + return false } - - return max } -// Hosts returns all hosts taking part in keyspace repair. -func (kp keyspacePlan) Hosts() []string { - out := strset.New() - for _, rep := range kp.Replicas { - out.Add(rep.ReplicaSet...) +// maxHostIntensity sets max_ranges_in_parallel for all repaired host. +func maxHostIntensity(ctx context.Context, client *scyllaclient.Client, hosts []string) (map[string]Intensity, error) { + shards, err := client.HostsShardCount(ctx, hosts) + if err != nil { + return nil, err + } + memory, err := client.HostsTotalMemory(ctx, hosts) + if err != nil { + return nil, err } - return out.List() + return hostMaxRanges(shards, memory), nil } -func setIntersection(s1, s2 *strset.Set) *strset.Set { - out := strset.New() - if s2.Size() < s1.Size() { - s1, s2 = s2, s1 +func hostMaxRanges(shards map[string]uint, memory map[string]int64) map[string]Intensity { + out := make(map[string]Intensity, len(shards)) + for h, sh := range shards { + out[h] = maxRepairRangesInParallel(sh, memory[h]) } - s1.Each(func(item string) bool { - if s2.Has(item) { - out.Add(item) - } - return true - }) return out } -// tablePlan describes repair schedule and state for table. -type tablePlan struct { - Table string - Size int64 - // Deleted tables are still being sent to workers, - // so that their progress can still be updated in a fake way, - // as their jobs are not actually sent to Scylla. - Deleted bool - // Optimized tables (small and fully replicated) - // have all ranges for replica set repaired in a single job. - Optimize bool - // Marks scheduled ranges. - MarkedRanges map[scyllaclient.TokenRange]struct{} - // Marks amount of scheduled ranges in replica set (by index). - MarkedInReplica []int - // Amount of scheduled and finished ranges. - Done int +func maxRepairRangesInParallel(shards uint, totalMemory int64) Intensity { + const MiB = 1024 * 1024 + memoryPerShard := totalMemory / int64(shards) + maxI := int(0.1 * float64(memoryPerShard) / (32 * MiB) / 4) + if maxI == 0 { + maxI = 1 + } + return NewIntensity(maxI) } -// MarkRange sets range as done for replica. -func (tp tablePlan) MarkRange(repIdx int, r scyllaclient.TokenRange) bool { - if _, ok := tp.MarkedRanges[r]; !ok { - tp.MarkedRanges[r] = struct{}{} - tp.MarkedInReplica[repIdx]++ - return true +// MaxRingParallel calculates max amount of repair jobs on ring limited to dcs. +func MaxRingParallel(ring scyllaclient.Ring, dcs []string) int { + repairedDCs := strset.New(dcs...) + dcNodeCnt := make(map[string]int) + for _, dc := range ring.HostDC { + dcNodeCnt[dc]++ + } + + switch ring.Replication { + case scyllaclient.SimpleStrategy: + repaired := 0 + for dc, cnt := range dcNodeCnt { + if repairedDCs.Has(dc) { + repaired += cnt + } + } + return repaired / ring.RF + case scyllaclient.NetworkTopologyStrategy: + minDC := math.MaxInt / 2 + for dc, rf := range ring.DCrf { + if repairedDCs.Has(dc) { + minDC = min(minDC, dcNodeCnt[dc]/rf) + } + } + if minDC == math.MaxInt/2 { + minDC = 1 + } + return minDC + default: + return 1 } - return false } -// filteredHosts returns hosts passing '--dc' and '--ignore-down-hosts' criteria. -func filteredHosts(target Target, status scyllaclient.NodeStatusInfoSlice) *strset.Set { - ignoredHosts := strset.New(target.IgnoreHosts...) - dcs := strset.New(target.DC...) - filtered := strset.New() +// Filters replica set according to --dc, --ignore-down-hosts, --host. +func filterReplicaSet(replicaSet []string, hostDC map[string]string, target Target) []string { + if target.Host != "" && !slice.ContainsString(replicaSet, target.Host) { + return nil + } - for _, node := range status { - if !ignoredHosts.Has(node.Addr) && dcs.Has(node.Datacenter) { - filtered.Add(node.Addr) + var out []string + for _, h := range replicaSet { + if slice.ContainsString(target.DC, hostDC[h]) && !slice.ContainsString(target.IgnoreHosts, h) { + out = append(out, h) } } - return filtered + return out } -// filterReplicaSet returns hosts present in filteredHosts and passing '--host' criteria. -func filteredReplicaSet(replicaSet []string, filteredHosts *strset.Set, host string) []string { - var out []string - for _, r := range replicaSet { - if filteredHosts.Has(r) { - out = append(out, r) +func newStats(sizeReport []scyllaclient.SizeReport, ranges map[scyllaclient.HostKeyspaceTable]int) map[scyllaclient.HostKeyspaceTable]tableStats { + out := make(map[scyllaclient.HostKeyspaceTable]tableStats, len(sizeReport)) + for _, sr := range sizeReport { + out[sr.HostKeyspaceTable] = tableStats{ + Size: sr.Size, + Ranges: ranges[sr.HostKeyspaceTable], } } + return out +} - if host != "" && !slice.ContainsString(out, host) { - out = nil +func newHostKsTable(host, ks, table string) scyllaclient.HostKeyspaceTable { + return scyllaclient.HostKeyspaceTable{ + Host: host, + Keyspace: ks, + Table: table, } - - return out } diff --git a/pkg/service/repair/progress.go b/pkg/service/repair/progress.go index fb8fe7c2b6..e0b76a48e3 100644 --- a/pkg/service/repair/progress.go +++ b/pkg/service/repair/progress.go @@ -31,19 +31,12 @@ type ProgressManager interface { OnJobStart(ctx context.Context, job job) // OnJobEnd must be called when single repair job is finished. OnJobEnd(ctx context.Context, job jobResult) - // UpdatePlan marks plan's ranges which were already successfully - // repaired in the previous run. - UpdatePlan(plan *plan) + // GetCompletedRanges returns ranges already successfully repaired in the previous runs. + GetCompletedRanges(keyspace, table string) []scyllaclient.TokenRange // AggregateProgress fetches RunProgress from DB and aggregates them into Progress. AggregateProgress() (Progress, error) } -type progressKey struct { - host string - keyspace string - table string -} - type stateKey struct { keyspace string table string @@ -55,14 +48,14 @@ type dbProgressManager struct { total atomic.Float64 // Total weighted repair progress tableSize map[string]int64 // Maps table to its size totalTableSize int64 // Sum over tableSize - keyspaceRanges map[string]int64 // Maps keyspace to its range count + tableRanges map[string]int // Maps table to its range count session gocqlx.Session metrics metrics.RepairMetrics logger log.Logger mu sync.Mutex - progress map[progressKey]*RunProgress + progress map[scyllaclient.HostKeyspaceTable]*RunProgress state map[stateKey]*RunState } @@ -79,16 +72,20 @@ func NewDBProgressManager(run *Run, session gocqlx.Session, metrics metrics.Repa func (pm *dbProgressManager) Init(plan *plan, prevID uuid.UUID) error { pm.run.PrevID = prevID - pm.total.Store(0) pm.metrics.SetProgress(pm.run.ClusterID, 0) - pm.tableSize = plan.TableSizeMap() - var total int64 - for _, s := range pm.tableSize { - total += s + + pm.tableSize = make(map[string]int64) + pm.totalTableSize = 0 + pm.tableRanges = make(map[string]int) + for _, ksp := range plan.Keyspaces { + for _, tp := range ksp.Tables { + fullName := ksp.Keyspace + "." + tp.Table + pm.tableSize[fullName] = tp.Size + pm.totalTableSize += tp.Size + pm.tableRanges[fullName] = tp.RangesCnt + } } - pm.totalTableSize = total - pm.keyspaceRanges = plan.KeyspaceRangesMap() // Init state before progress, so that we don't resume progress when state is empty if err := pm.initState(plan); err != nil { @@ -153,35 +150,21 @@ func (pm *dbProgressManager) isRunInitialized(run *Run) bool { } func (pm *dbProgressManager) initProgress(plan *plan) error { - pm.progress = make(map[progressKey]*RunProgress) + pm.progress = make(map[scyllaclient.HostKeyspaceTable]*RunProgress) // Fill all possible progress entries (#tables * #nodes) for _, kp := range plan.Keyspaces { for _, tp := range kp.Tables { - for _, rep := range kp.Replicas { - for _, h := range rep.ReplicaSet { - pk := progressKey{ - host: h, - keyspace: kp.Keyspace, - table: tp.Table, - } - rp := pm.progress[pk] - if rp == nil { - rp = &RunProgress{ - ClusterID: pm.run.ClusterID, - TaskID: pm.run.TaskID, - RunID: pm.run.ID, - Host: pk.host, - Keyspace: pk.keyspace, - Table: pk.table, - Size: plan.HostTableSize[scyllaclient.HostKeyspaceTable{ - Host: pk.host, - Keyspace: pk.keyspace, - Table: pk.table, - }], - } - pm.progress[pk] = rp - } - rp.TokenRanges += int64(len(rep.Ranges)) + for _, h := range plan.Hosts { + pk := newHostKsTable(h, kp.Keyspace, tp.Table) + pm.progress[newHostKsTable(h, kp.Keyspace, tp.Table)] = &RunProgress{ + ClusterID: pm.run.ClusterID, + TaskID: pm.run.TaskID, + RunID: pm.run.ID, + Host: pk.Host, + Keyspace: pk.Keyspace, + Table: pk.Table, + Size: plan.Stats[pk].Size, + TokenRanges: int64(plan.Stats[pk].Ranges), } } } @@ -191,11 +174,7 @@ func (pm *dbProgressManager) initProgress(plan *plan) error { // Watch out for empty state after upgrade (#3534). if !pm.emptyState() { err := pm.ForEachPrevRunProgress(func(rp *RunProgress) { - pk := progressKey{ - host: rp.Host, - keyspace: rp.Keyspace, - table: rp.Table, - } + pk := newHostKsTable(rp.Host, rp.Keyspace, rp.Table) if _, ok := pm.progress[pk]; !ok { return } @@ -282,23 +261,12 @@ func (pm *dbProgressManager) emptyState() bool { return true } -// UpdatePlan marks already repaired token ranges in plan using state. -func (pm *dbProgressManager) UpdatePlan(plan *plan) { - for _, kp := range plan.Keyspaces { - for tabIdx, tp := range kp.Tables { - sk := stateKey{ - keyspace: kp.Keyspace, - table: tp.Table, - } - // Skip only successfully repaired ranges - for _, r := range pm.state[sk].SuccessRanges { - if repIdx, ok := kp.TokenRepIdx[r]; ok { - _ = tp.MarkRange(repIdx, r) - kp.Tables[tabIdx].Done++ - } - } - } +func (pm *dbProgressManager) GetCompletedRanges(keyspace, table string) []scyllaclient.TokenRange { + sk := stateKey{ + keyspace: keyspace, + table: table, } + return pm.state[sk].SuccessRanges } func (pm *dbProgressManager) OnJobStart(ctx context.Context, j job) { @@ -307,11 +275,7 @@ func (pm *dbProgressManager) OnJobStart(ctx context.Context, j job) { defer q.Release() for _, h := range j.replicaSet { - pk := progressKey{ - host: h, - keyspace: j.keyspace, - table: j.table, - } + pk := newHostKsTable(h, j.keyspace, j.table) pm.mu.Lock() rp := pm.progress[pk] @@ -346,11 +310,7 @@ func (pm *dbProgressManager) onJobEndProgress(ctx context.Context, result jobRes defer q.Release() for _, h := range result.replicaSet { - pk := progressKey{ - host: h, - keyspace: result.keyspace, - table: result.table, - } + pk := newHostKsTable(h, result.keyspace, result.table) pm.mu.Lock() rp := pm.progress[pk] @@ -421,8 +381,8 @@ func (pm *dbProgressManager) updateTotalProgress(keyspace, table string, ranges totalWeight = pm.totalTableSize } - delta := float64(ranges) / float64(pm.keyspaceRanges[keyspace]) * 100 // Not weighted percentage progress delta - delta *= float64(weight) / float64(totalWeight) // Apply weight + delta := float64(ranges) / float64(pm.tableRanges[keyspace]) * 100 // Not weighted percentage progress delta + delta *= float64(weight) / float64(totalWeight) // Apply weight // Watch out for rounding over 100% errors if total := pm.total.Add(delta); total <= 100 { diff --git a/pkg/service/repair/progress_integration_test.go b/pkg/service/repair/progress_integration_test.go index e727fe8d63..4af8bd99d0 100644 --- a/pkg/service/repair/progress_integration_test.go +++ b/pkg/service/repair/progress_integration_test.go @@ -7,6 +7,7 @@ package repair import ( "context" + "slices" "testing" "time" @@ -47,31 +48,22 @@ func TestProgressManagerIntegration(t *testing.T) { StartToken: 0, EndToken: 10, } - token2 = scyllaclient.TokenRange{ - StartToken: 11, - EndToken: 20, - } p = &plan{ + Hosts: []string{"h1", "h2"}, + Stats: map[scyllaclient.HostKeyspaceTable]tableStats{ + newHostKsTable("h1", "k1", "t1"): { + Size: 5, + Ranges: 2, + }, + newHostKsTable("h2", "k1", "t1"): { + Size: 7, + Ranges: 2, + }, + }, Keyspaces: []keyspacePlan{ { Keyspace: "k1", - Tables: []tablePlan{ - { - Table: "t1", - MarkedRanges: make(map[scyllaclient.TokenRange]struct{}), - MarkedInReplica: make([]int, 1), - }, - }, - Replicas: []scyllaclient.ReplicaTokenRanges{ - { - ReplicaSet: []string{"h1", "h2"}, - Ranges: []scyllaclient.TokenRange{token1, token2}, - }, - }, - TokenRepIdx: map[scyllaclient.TokenRange]int{ - token1: 0, - token2: 0, - }, + Tables: []tablePlan{{Table: "t1"}}, }, }, } @@ -97,6 +89,7 @@ func TestProgressManagerIntegration(t *testing.T) { Host: "h1", Keyspace: "k1", Table: "t1", + Size: 5, TokenRanges: 2, Success: 0, Error: 0, @@ -108,6 +101,7 @@ func TestProgressManagerIntegration(t *testing.T) { Host: "h2", Keyspace: "k1", Table: "t1", + Size: 7, TokenRanges: 2, Success: 0, Error: 0, @@ -193,35 +187,23 @@ func TestProgressManagerIntegration(t *testing.T) { StartToken: 5, EndToken: 10, } - token2 = scyllaclient.TokenRange{ - StartToken: 15, - EndToken: 30, - } token3 = scyllaclient.TokenRange{ StartToken: 50, EndToken: 100, } p = &plan{ // Plan containing token1 and token2 + Stats: map[scyllaclient.HostKeyspaceTable]tableStats{ + newHostKsTable("h1", "k1", "t1"): { + Ranges: 1, + }, + newHostKsTable("h2", "k1", "t1"): { + Ranges: 1, + }, + }, Keyspaces: []keyspacePlan{ { Keyspace: "k1", - Tables: []tablePlan{ - { - Table: "t1", - MarkedRanges: make(map[scyllaclient.TokenRange]struct{}), - MarkedInReplica: make([]int, 1), - }, - }, - Replicas: []scyllaclient.ReplicaTokenRanges{ - { - ReplicaSet: []string{"h1", "h2"}, - Ranges: []scyllaclient.TokenRange{token1}, - }, - }, - TokenRepIdx: map[scyllaclient.TokenRange]int{ - token1: 0, - token2: 0, - }, + Tables: []tablePlan{{Table: "t1"}}, }, }, } @@ -265,18 +247,10 @@ func TestProgressManagerIntegration(t *testing.T) { if err := pm.Init(p, prevID); err != nil { t.Fatal(err) } - pm.UpdatePlan(p) - + done := pm.GetCompletedRanges("k1", "t1") Print("Then: validate marked token1 and not marked token3") - tp := p.Keyspaces[0].Tables[0] - if len(tp.MarkedRanges) != 1 { - t.Fatal("expected 1 marked range") - } - if _, ok := tp.MarkedRanges[token1]; !ok { - t.Fatal("expected token1 to be marked") - } - if tp.MarkedInReplica[0] != 1 { - t.Fatal("expected 1 marked range in the first replica set") + if len(done) != 2 || !slices.Contains(done, token1) || !slices.Contains(done, token3) { + t.Fatal("expected both token ranges to be done") } }) } diff --git a/pkg/service/repair/repair_test.go b/pkg/service/repair/repair_test.go new file mode 100644 index 0000000000..6d4d517c24 --- /dev/null +++ b/pkg/service/repair/repair_test.go @@ -0,0 +1,291 @@ +// Copyright (C) 2024 ScyllaDB + +package repair_test + +import ( + "fmt" + "testing" + + "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" + "github.com/scylladb/scylla-manager/v3/pkg/service/repair" +) + +func TestMaxRingParallel(t *testing.T) { + hostDC := map[string]string{ + // dc1 -> 3 + "h1": "dc1", + "h2": "dc1", + "h3": "dc1", + // dc2 -> 4 + "h4": "dc2", + "h5": "dc2", + "h6": "dc2", + "h7": "dc2", + // dc3 -> 5 + "h8": "dc3", + "h9": "dc3", + "h10": "dc3", + "h11": "dc3", + "h12": "dc3", + } + + testCases := []struct { + Ring scyllaclient.Ring + DCs []string + Expected int + }{ + { + Ring: scyllaclient.Ring{ + HostDC: hostDC, + Replication: scyllaclient.SimpleStrategy, + RF: 4, + }, + DCs: []string{"dc1", "dc2", "dc3"}, + Expected: 3, + }, + { + Ring: scyllaclient.Ring{ + HostDC: hostDC, + Replication: scyllaclient.SimpleStrategy, + RF: 3, + }, + DCs: []string{"dc1", "dc2"}, + Expected: 2, + }, + { + Ring: scyllaclient.Ring{ + HostDC: hostDC, + Replication: scyllaclient.NetworkTopologyStrategy, + RF: 5, + DCrf: map[string]int{ + "dc1": 1, + "dc2": 2, + "dc3": 2, + }, + }, + DCs: []string{"dc1", "dc2", "dc3"}, + Expected: 2, + }, + { + Ring: scyllaclient.Ring{ + HostDC: hostDC, + Replication: scyllaclient.NetworkTopologyStrategy, + RF: 8, + DCrf: map[string]int{ + "dc1": 1, + "dc2": 2, + "dc3": 5, + }, + }, + DCs: []string{"dc1", "dc2"}, + Expected: 2, + }, + { + Ring: scyllaclient.Ring{ + HostDC: hostDC, + Replication: scyllaclient.NetworkTopologyStrategy, + RF: 4, + DCrf: map[string]int{ + "dc1": 2, + "dc2": 1, + "dc3": 1, + }, + }, + DCs: []string{"dc1", "dc3"}, + Expected: 1, + }, + } + + for i := range testCases { + tc := testCases[i] + t.Run("test "+fmt.Sprint(i), func(t *testing.T) { + t.Parallel() + if out := repair.MaxRingParallel(tc.Ring, tc.DCs); out != tc.Expected { + t.Fatalf("Expected %d, got %d", tc.Expected, out) + } + }) + } +} + +func TestShouldRepairRing(t *testing.T) { + hostDC := map[string]string{ + // dc1 -> 1 + "h1": "dc1", + // dc2 -> 1 + "h2": "dc2", + // dc3 -> 3 + "h3": "dc3", + "h4": "dc3", + "h5": "dc3", + // dc3 -> 4 + "h6": "dc4", + "h7": "dc4", + "h8": "dc4", + "h9": "dc4", + } + + testCases := []struct { + Ring scyllaclient.Ring + DCs []string + Host string + Expected bool + }{ + { + Ring: scyllaclient.Ring{ + HostDC: hostDC, + Replication: scyllaclient.LocalStrategy, + RF: 1, + }, + DCs: []string{"dc1", "dc2", "dc3", "dc4"}, + Expected: false, + }, + { + Ring: scyllaclient.Ring{ + HostDC: hostDC, + Replication: scyllaclient.SimpleStrategy, + RF: 1, + }, + DCs: []string{"dc1", "dc2", "dc3", "dc4"}, + Expected: false, + }, + { + Ring: scyllaclient.Ring{ + HostDC: hostDC, + Replication: scyllaclient.SimpleStrategy, + RF: 2, + }, + DCs: []string{"dc1", "dc2", "dc3", "dc4"}, + Expected: true, + }, + { + Ring: scyllaclient.Ring{ + HostDC: hostDC, + Replication: scyllaclient.SimpleStrategy, + RF: 3, + }, + DCs: []string{"dc3", "dc4"}, + Expected: false, + }, + { + Ring: scyllaclient.Ring{ + HostDC: hostDC, + Replication: scyllaclient.SimpleStrategy, + RF: 4, + }, + DCs: []string{"dc3", "dc4"}, + Expected: true, + }, + { + Ring: scyllaclient.Ring{ + HostDC: hostDC, + Replication: scyllaclient.SimpleStrategy, + RF: 4, + }, + DCs: []string{"dc4"}, + Expected: false, + }, + { + Ring: scyllaclient.Ring{ + HostDC: hostDC, + Replication: scyllaclient.NetworkTopologyStrategy, + RF: 4, + DCrf: map[string]int{ + "dc1": 1, + "dc2": 1, + "dc3": 1, + "dc4": 1, + }, + }, + DCs: []string{"dc1", "dc2", "dc3", "dc4"}, + Expected: true, + }, + { + Ring: scyllaclient.Ring{ + HostDC: hostDC, + Replication: scyllaclient.NetworkTopologyStrategy, + RF: 4, + DCrf: map[string]int{ + "dc1": 1, + "dc2": 1, + "dc3": 1, + "dc4": 1, + }, + }, + DCs: []string{"dc1"}, + Expected: false, + }, + { + Ring: scyllaclient.Ring{ + HostDC: hostDC, + Replication: scyllaclient.NetworkTopologyStrategy, + RF: 8, + DCrf: map[string]int{ + "dc1": 1, + "dc2": 1, + "dc3": 2, + "dc4": 2, + }, + }, + DCs: []string{"dc3"}, + Expected: true, + }, + { + Ring: scyllaclient.Ring{ + HostDC: hostDC, + Replication: scyllaclient.NetworkTopologyStrategy, + RF: 8, + DCrf: map[string]int{ + "dc1": 1, + "dc2": 1, + "dc3": 3, + "dc4": 4, + }, + }, + DCs: []string{"dc4"}, + Host: "h6", + Expected: true, + }, + { + Ring: scyllaclient.Ring{ + HostDC: hostDC, + Replication: scyllaclient.NetworkTopologyStrategy, + RF: 8, + DCrf: map[string]int{ + "dc1": 1, + "dc2": 1, + "dc3": 3, + "dc4": 4, + }, + }, + DCs: []string{"dc4"}, + Host: "h2", + Expected: false, + }, + { + Ring: scyllaclient.Ring{ + HostDC: hostDC, + Replication: scyllaclient.NetworkTopologyStrategy, + RF: 8, + DCrf: map[string]int{ + "dc1": 1, + "dc2": 1, + "dc3": 3, + "dc4": 4, + }, + }, + DCs: []string{"dc1", "dc2", "dc3", "dc4"}, + Host: "h1", + Expected: true, + }, + } + + for i := range testCases { + tc := testCases[i] + t.Run("test "+fmt.Sprint(i), func(t *testing.T) { + t.Parallel() + if out := repair.ShouldRepairRing(tc.Ring, tc.DCs, tc.Host); out != tc.Expected { + t.Fatalf("Expected %v, got %v", tc.Expected, out) + } + }) + } +} diff --git a/pkg/service/repair/service.go b/pkg/service/repair/service.go index b9f36ed279..07fdfce17a 100644 --- a/pkg/service/repair/service.go +++ b/pkg/service/repair/service.go @@ -149,13 +149,13 @@ func (s *Service) GetTarget(ctx context.Context, clusterID uuid.UUID, properties return t, errors.Wrap(ErrEmptyRepair, err.Error()) } - t.plan, err = newPlan(ctx, t, client) + p, err := newPlan(ctx, t, client) if err != nil { return t, errors.Wrap(err, "create repair plan") } // Sort plan - t.plan.SizeSort() - t.plan.PrioritySort(NewInternalTablePreference()) + p.SizeSort() + p.PrioritySort(NewInternalTablePreference()) if clusterSession, err := s.clusterSession(ctx, clusterID); err != nil { s.logger.Info(ctx, "No cluster credentials, couldn't ensure repairing base table before its views", "error", err) } else { @@ -163,23 +163,11 @@ func (s *Service) GetTarget(ctx context.Context, clusterID uuid.UUID, properties if err != nil { return t, errors.Wrap(err, "get cluster views") } - t.plan.ViewSort(views) - } - - t.plan.SetMaxParallel(dcMap) - if err := t.plan.SetMaxHostIntensity(ctx, client); err != nil { - return t, errors.Wrap(err, "calculate intensity limits") - } - - if len(t.plan.SkippedKeyspaces) > 0 { - s.logger.Info(ctx, - "Repair of the following keyspaces will be skipped because not all the tokens are present in the specified DCs", - "keyspaces", strings.Join(t.plan.SkippedKeyspaces, ", "), - ) + p.ViewSort(views) } // Set filtered units as they are still used for displaying --dry-run - t.Units = t.plan.Units() + t.Units = p.FilteredUnits(t.Units) return t, nil } @@ -202,7 +190,6 @@ func (s *Service) Repair(ctx context.Context, clusterID, taskID, runID uuid.UUID "run_id", runID, "target", target, ) - run := &Run{ ClusterID: clusterID, TaskID: taskID, @@ -222,6 +209,24 @@ func (s *Service) Repair(ctx context.Context, clusterID, taskID, runID uuid.UUID return errors.Wrap(err, "get client") } + p, err := newPlan(ctx, target, client) + if err != nil { + return errors.Wrap(err, "create repair plan") + } + var ord int + for _, kp := range p.Keyspaces { + for _, tp := range kp.Tables { + ord++ + s.logger.Info(ctx, "Repair order", + "order", ord, + "keyspace", kp.Keyspace, + "table", tp.Table, + "size", tp.Size, + "merge_ranges", tp.Optimize, + ) + } + } + pm := NewDBProgressManager(run, s.session, s.metrics, s.logger) prevID := uuid.Nil if target.Continue { @@ -232,10 +237,10 @@ func (s *Service) Repair(ctx context.Context, clusterID, taskID, runID uuid.UUID run.Intensity = prev.Intensity } } - if err := pm.Init(target.plan, prevID); err != nil { + + if err := pm.Init(p, prevID); err != nil { return err } - pm.UpdatePlan(target.plan) s.putRunLogError(ctx, run) gracefulCtx, cancel := context.WithCancel(context.Background()) @@ -254,7 +259,7 @@ func (s *Service) Repair(ctx context.Context, clusterID, taskID, runID uuid.UUID // Give intensity handler the ability to set pool size ih, cleanup := s.newIntensityHandler(ctx, clusterID, taskID, runID, - target.plan.MaxHostIntensity, target.plan.MaxParallel, workers) + p.MaxHostIntensity, p.MaxParallel, workers) defer cleanup() // Set controlled parameters @@ -263,7 +268,7 @@ func (s *Service) Repair(ctx context.Context, clusterID, taskID, runID uuid.UUID // Give generator the ability to read parallel/intensity and // to submit and receive results from worker pool. - gen, err := newGenerator(ctx, target, client, ih, workers, s.logger) + gen, err := newGenerator(ctx, target, client, ih, workers, p, pm, s.logger) if err != nil { return errors.Wrap(err, "create generator") } @@ -280,15 +285,14 @@ func (s *Service) Repair(ctx context.Context, clusterID, taskID, runID uuid.UUID } }() - hosts := target.plan.Hosts() - if active, err := client.ActiveRepairs(ctx, hosts); err != nil { + if active, err := client.ActiveRepairs(ctx, p.Hosts); err != nil { s.logger.Error(ctx, "Active repair check failed", "error", err) } else if len(active) > 0 { return errors.Errorf("ensure no active repair on hosts, %s are repairing", strings.Join(active, ", ")) } if err = gen.Run(gracefulCtx); (err != nil && target.FailFast) || ctx.Err() != nil { - s.killAllRepairs(ctx, client, hosts) + s.killAllRepairs(ctx, client, p.Hosts) } close(done) @@ -298,7 +302,7 @@ func (s *Service) Repair(ctx context.Context, clusterID, taskID, runID uuid.UUID s.putRunLogError(ctx, run) } // Ensure that not interrupted repair has 100% progress (invalidate rounding errors). - if gen.lastPercent == 100 { + if ctx.Err() == nil && (!target.FailFast || err == nil) { s.metrics.SetProgress(clusterID, 100) } diff --git a/pkg/service/repair/service_repair_integration_test.go b/pkg/service/repair/service_repair_integration_test.go index dd440ca991..214c94c118 100644 --- a/pkg/service/repair/service_repair_integration_test.go +++ b/pkg/service/repair/service_repair_integration_test.go @@ -1648,7 +1648,7 @@ func TestServiceRepairIntegration(t *testing.T) { h.assertProgress(IPFromTestNet("12"), 60, longWait) Print("And: repair contains error") - h.assertErrorContains("token ranges out of", longWait) + h.assertErrorContains("logs", longWait) }) t.Run("repair error fail fast", func(t *testing.T) { diff --git a/pkg/util/maputil/map.go b/pkg/util/maputil/map.go new file mode 100644 index 0000000000..6ae9e5234e --- /dev/null +++ b/pkg/util/maputil/map.go @@ -0,0 +1,16 @@ +// Copyright (C) 2024 ScyllaDB + +package maputil + +// Equal checks if m1 and m2 are the same mapping. +func Equal[K, V comparable](m1, m2 map[K]V) bool { + if len(m1) != len(m2) { + return false + } + for k, v1 := range m1 { + if v2, ok := m2[k]; !ok || v1 != v2 { + return false + } + } + return true +}