Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Repair: describe ring per table and without cache #3718

Merged
merged 6 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 41 additions & 11 deletions pkg/scyllaclient/client_scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pkg/errors"
"github.com/scylladb/go-set/strset"
"github.com/scylladb/scylla-manager/v3/pkg/dht"
"github.com/scylladb/scylla-manager/v3/pkg/util/maputil"
"github.com/scylladb/scylla-manager/v3/pkg/util/slice"
"go.uber.org/multierr"

Expand Down Expand Up @@ -383,6 +384,9 @@ func (c *Client) DescribeRing(ctx context.Context, keyspace string) (Ring, error
if err != nil {
return Ring{}, err
}
if len(resp.Payload) == 0 {
return Ring{}, errors.New("received empty token range list")
}

ring := Ring{
ReplicaTokens: make([]ReplicaTokenRanges, 0),
Expand All @@ -393,6 +397,9 @@ func (c *Client) DescribeRing(ctx context.Context, keyspace string) (Ring, error
replicaTokens := make(map[uint64][]TokenRange)
replicaHash := make(map[uint64][]string)

isNetworkTopologyStrategy := true
rf := len(resp.Payload[0].Endpoints)
var dcRF map[string]int
for _, p := range resp.Payload {
// Parse tokens
startToken, err := strconv.ParseInt(p.StartToken, 10, 64)
Expand All @@ -415,6 +422,21 @@ func (c *Client) DescribeRing(ctx context.Context, keyspace string) (Ring, error
EndToken: endToken,
})

// Update replication factors
if rf != len(p.Endpoints) {
return Ring{}, errors.Errorf("ifferent token ranges have different rf (%d/%d). Repair is not safe for now", rf, len(p.Endpoints))
}
tokenDCrf := make(map[string]int)
for _, e := range p.EndpointDetails {
tokenDCrf[e.Datacenter]++
}
// NetworkTopologyStrategy -> all token ranges have the same dc to rf mapping
if dcRF == nil || maputil.Equal(dcRF, tokenDCrf) {
dcRF = tokenDCrf
} else {
isNetworkTopologyStrategy = false
}

// Update host to DC mapping
for _, e := range p.EndpointDetails {
ring.HostDC[e.Host] = e.Datacenter
Expand Down Expand Up @@ -443,16 +465,15 @@ func (c *Client) DescribeRing(ctx context.Context, keyspace string) (Ring, error
}

// Detect replication strategy
if len(ring.HostDC) == 1 {
ring.RF = rf
switch {
case len(ring.HostDC) == 1:
ring.Replication = LocalStrategy
} else {
case isNetworkTopologyStrategy:
ring.Replication = NetworkTopologyStrategy
for _, tokens := range dcTokens {
if tokens != len(resp.Payload) {
ring.Replication = SimpleStrategy
break
}
}
ring.DCrf = dcRF
default:
ring.Replication = SimpleStrategy
}

return ring, nil
Expand Down Expand Up @@ -854,8 +875,14 @@ func (t HostKeyspaceTables) Hosts() []string {
return s.List()
}

// SizeReport extends HostKeyspaceTable with Size information.
type SizeReport struct {
HostKeyspaceTable
Size int64
}

// TableDiskSizeReport returns total on disk size of tables in bytes.
func (c *Client) TableDiskSizeReport(ctx context.Context, hostKeyspaceTables HostKeyspaceTables) ([]int64, error) {
func (c *Client) TableDiskSizeReport(ctx context.Context, hostKeyspaceTables HostKeyspaceTables) ([]SizeReport, error) {
// Get shard count of a first node to estimate parallelism limit
shards, err := c.ShardCount(ctx, "")
if err != nil {
Expand All @@ -864,7 +891,7 @@ func (c *Client) TableDiskSizeReport(ctx context.Context, hostKeyspaceTables Hos

var (
limit = len(hostKeyspaceTables.Hosts()) * int(shards)
report = make([]int64, len(hostKeyspaceTables))
report = make([]SizeReport, len(hostKeyspaceTables))
)

f := func(i int) error {
Expand All @@ -881,7 +908,10 @@ func (c *Client) TableDiskSizeReport(ctx context.Context, hostKeyspaceTables Hos
"size", size,
)

report[i] = size
report[i] = SizeReport{
HostKeyspaceTable: v,
Size: size,
}
return nil
}

Expand Down
73 changes: 73 additions & 0 deletions pkg/scyllaclient/client_scylla_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"testing"
"time"

"github.com/scylladb/scylla-manager/v3/pkg/testutils/db"
. "github.com/scylladb/scylla-manager/v3/pkg/testutils/testconfig"
"github.com/scylladb/scylla-manager/v3/pkg/util/maputil"

"github.com/google/go-cmp/cmp"
"github.com/scylladb/go-log"
Expand Down Expand Up @@ -71,6 +73,77 @@ func TestClientStatusIntegration(t *testing.T) {
}
}

func TestClientDescribeRingIntegration(t *testing.T) {
testCases := []struct {
replicationStmt string
replication scyllaclient.ReplicationStrategy
rf int
dcRF map[string]int
}{
{
replicationStmt: "{'class': 'SimpleStrategy', 'replication_factor': 4}",
replication: scyllaclient.SimpleStrategy,
rf: 4,
},
{
replicationStmt: "{'class': 'NetworkTopologyStrategy', 'dc1': 1}",
replication: scyllaclient.NetworkTopologyStrategy,
rf: 1,
dcRF: map[string]int{
"dc1": 1,
},
},
{
replicationStmt: "{'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}",
replication: scyllaclient.NetworkTopologyStrategy,
rf: 4,
dcRF: map[string]int{
"dc1": 2,
"dc2": 2,
},
},
{
replicationStmt: "{'class': 'NetworkTopologyStrategy', 'dc1': 3, 'dc2': 3}",
replication: scyllaclient.NetworkTopologyStrategy,
rf: 6,
dcRF: map[string]int{
"dc1": 3,
"dc2": 3,
},
},
}

client, err := scyllaclient.NewClient(scyllaclient.TestConfig(ManagedClusterHosts(), AgentAuthToken()), log.NewDevelopment())
if err != nil {
t.Fatal(err)
}
clusterSession := db.CreateSessionAndDropAllKeyspaces(t, client)
defer clusterSession.Close()

for i := range testCases {
tc := testCases[i]
if err := clusterSession.ExecStmt("DROP KEYSPACE IF EXISTS test_ks"); err != nil {
t.Fatal(err)
}
if err := clusterSession.ExecStmt("CREATE KEYSPACE test_ks WITH replication = " + tc.replicationStmt); err != nil {
t.Fatal(err)
}
ring, err := client.DescribeRing(context.Background(), "test_ks")
if err != nil {
t.Fatal(err)
}
if tc.replication != ring.Replication {
t.Fatalf("Replication: expected %s, got %s", tc.replication, ring.Replication)
}
if tc.rf != ring.RF {
t.Fatalf("RF: expected %d, got %d", tc.rf, ring.RF)
}
if !maputil.Equal(tc.dcRF, ring.DCrf) {
t.Fatalf("DCrf: expected %v, got %v", tc.dcRF, ring.DCrf)
}
}
}

func TestClientActiveRepairsIntegration(t *testing.T) {
client, err := scyllaclient.NewClient(scyllaclient.TestConfig(ManagedClusterHosts(), AgentAuthToken()), log.NewDevelopment())
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/scyllaclient/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ type Ring struct {
ReplicaTokens []ReplicaTokenRanges
HostDC map[string]string
Replication ReplicationStrategy
RF int
DCrf map[string]int // initialized only for NetworkTopologyStrategy
}

// Datacenters returns a list of datacenters the keyspace is replicated in.
Expand Down
4 changes: 2 additions & 2 deletions pkg/service/backup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,8 @@ func (s *Service) GetTargetSize(ctx context.Context, clusterID uuid.UUID, target
}

var total int64
for _, size := range report {
total += size
for _, sr := range report {
total += sr.Size
}

return total, err
Expand Down
Loading
Loading