Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tablet repair #3725

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 39 additions & 9 deletions pkg/scyllaclient/client_scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ import (
"github.com/pkg/errors"
"github.com/scylladb/go-set/strset"
"github.com/scylladb/scylla-manager/v3/pkg/dht"
"github.com/scylladb/scylla-manager/v3/pkg/util/slice"
"go.uber.org/multierr"

"github.com/scylladb/scylla-manager/v3/pkg/util/parallel"
"github.com/scylladb/scylla-manager/v3/pkg/util/pointer"
"github.com/scylladb/scylla-manager/v3/pkg/util/prom"
"github.com/scylladb/scylla-manager/v3/pkg/util/slice"
"github.com/scylladb/scylla-manager/v3/swagger/gen/scylla/v1/client/operations"
"github.com/scylladb/scylla-manager/v3/swagger/gen/scylla/v1/models"
"go.uber.org/multierr"
)

// ErrHostInvalidResponse is to indicate that one of the root-causes is the invalid response from scylla-server.
Expand Down Expand Up @@ -246,15 +245,33 @@ func (c *Client) hosts(ctx context.Context) ([]string, error) {
return v, nil
}

// Keyspaces return a list of all the keyspaces.
func (c *Client) Keyspaces(ctx context.Context) ([]string, error) {
resp, err := c.scyllaOps.StorageServiceKeyspacesGet(&operations.StorageServiceKeyspacesGetParams{Context: ctx})
// KeyspaceReplication describes keyspace replication type.
type KeyspaceReplication = string

// KeyspaceReplication enum.
const (
ReplicationAll = "all"
ReplicationVnode = "vnodes"
ReplicationTablet = "tablets"
)

// ReplicationKeyspaces return a list of keyspaces with given replication.
func (c *Client) ReplicationKeyspaces(ctx context.Context, replication KeyspaceReplication) ([]string, error) {
resp, err := c.scyllaOps.StorageServiceKeyspacesGet(&operations.StorageServiceKeyspacesGetParams{
Context: ctx,
Replication: &replication,
})
if err != nil {
return nil, err
}
return resp.Payload, nil
}

// Keyspaces return a list of all the keyspaces.
func (c *Client) Keyspaces(ctx context.Context) ([]string, error) {
return c.ReplicationKeyspaces(ctx, ReplicationAll)
}

// Tables returns a slice of table names in a given keyspace.
func (c *Client) Tables(ctx context.Context, keyspace string) ([]string, error) {
resp, err := c.scyllaOps.ColumnFamilyNameGet(&operations.ColumnFamilyNameGetParams{Context: ctx})
Expand Down Expand Up @@ -374,12 +391,25 @@ func (c *Client) metrics(ctx context.Context, host, name string) (map[string]*pr
return prom.ParseText(resp.Body)
}

// DescribeRing returns a description of token range of a given keyspace.
func (c *Client) DescribeRing(ctx context.Context, keyspace string) (Ring, error) {
resp, err := c.scyllaOps.StorageServiceDescribeRingByKeyspaceGet(&operations.StorageServiceDescribeRingByKeyspaceGetParams{
// DescribeTabletRing returns a description of token range of a given tablet table.
func (c *Client) DescribeTabletRing(ctx context.Context, keyspace, table string) (Ring, error) {
return c.describeRing(&operations.StorageServiceDescribeRingByKeyspaceGetParams{
Context: ctx,
Keyspace: keyspace,
Table: &table,
})
}

// DescribeVnodeRing returns a description of token range of a given vnode keyspace.
func (c *Client) DescribeVnodeRing(ctx context.Context, keyspace string) (Ring, error) {
return c.describeRing(&operations.StorageServiceDescribeRingByKeyspaceGetParams{
Context: ctx,
Keyspace: keyspace,
})
}

func (c *Client) describeRing(params *operations.StorageServiceDescribeRingByKeyspaceGetParams) (Ring, error) {
resp, err := c.scyllaOps.StorageServiceDescribeRingByKeyspaceGet(params)
if err != nil {
return Ring{}, err
}
Expand Down
93 changes: 93 additions & 0 deletions pkg/scyllaclient/ring_describer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (C) 2024 ScyllaDB

package scyllaclient

import (
"context"

"github.com/pkg/errors"
"github.com/scylladb/go-set/strset"
)

// RingDescriber describes token rings on table basis for bot vnode and tablet tables.
type RingDescriber interface {
DescribeRing(ctx context.Context, keyspace, table string) (Ring, error)
}

type ringCache struct {
Keyspace string
Table string
Ring Ring
}

func (rc *ringCache) tryGetRing(keyspace, table string) (Ring, bool) {
if rc.Keyspace == keyspace && rc.Table == table {
return rc.Ring, true
}
return Ring{}, false
}

func (rc *ringCache) setRing(keyspace, table string, ring Ring) {
rc.Keyspace = keyspace
rc.Table = table
rc.Ring = ring
}

type ringDescriber struct {
client *Client
tabletKs *strset.Set
cache *ringCache
}

func (rd *ringDescriber) DescribeRing(ctx context.Context, keyspace, table string) (Ring, error) {
if ring, ok := rd.cache.tryGetRing(keyspace, table); ok {
return ring, nil
}

var (
ring Ring
err error
)
if rd.tabletKs.Has(keyspace) {
ring, err = rd.client.DescribeTabletRing(ctx, keyspace, table)
} else {
ring, err = rd.client.DescribeVnodeRing(ctx, keyspace)
}
if err != nil {
return Ring{}, errors.Wrap(err, "describe ring")
}

rd.cache.setRing(keyspace, table, ring)
return ring, nil
}

func NewRingDescriber(ctx context.Context, client *Client) RingDescriber {
return &ringDescriber{
client: client,
tabletKs: getTabletKs(ctx, client),
cache: new(ringCache),
}
}

// getTabletKs returns set of tablet replicated keyspaces.
func getTabletKs(ctx context.Context, client *Client) *strset.Set {
out := strset.New()
// Assume that errors indicate that endpoints rejected 'replication' param,
// which means that given Scylla version does not support tablet API.
// Other errors will be handled on other API calls.
tablets, err := client.ReplicationKeyspaces(ctx, ReplicationTablet)
if err != nil {
return out
}
vnodes, err := client.ReplicationKeyspaces(ctx, ReplicationVnode)
if err != nil {
return out
}
// Even when both API calls succeeded, we need to validate
// that the 'replication' param wasn't silently ignored.
out.Add(tablets...)
if out.HasAny(vnodes...) {
return strset.New()
}
return out
}
55 changes: 30 additions & 25 deletions pkg/service/backup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,36 +220,39 @@ func (s *Service) GetTarget(ctx context.Context, clusterID uuid.UUID, properties
AllTables: true,
}

ringDescriber := scyllaclient.NewRingDescriber(ctx, client)
for _, keyspace := range keyspaces {
tables, err := client.Tables(ctx, keyspace)
if err != nil {
return t, errors.Wrapf(err, "keyspace %s: get tables", keyspace)
}

// Get the ring description and skip local data
ring, err := client.DescribeRing(ctx, keyspace)
if err != nil {
return t, errors.Wrapf(err, "keyspace %s: get ring description", keyspace)
}
if ring.Replication == scyllaclient.LocalStrategy {
if strings.HasPrefix(keyspace, "system") && keyspace != "system_schema" {
continue
for _, tab := range tables {
// Get the ring description and skip local data
ring, err := ringDescriber.DescribeRing(ctx, keyspace, tab)
if err != nil {
return t, errors.Wrapf(err, "%s.%s: get ring description", keyspace, tab)
}
} else {
// Check if keyspace has replica in any DC
if !targetDCs.HasAny(ring.Datacenters()...) {
continue
if ring.Replication == scyllaclient.LocalStrategy {
if strings.HasPrefix(keyspace, "system") && keyspace != "system_schema" {
continue
}
} else {
// Check if keyspace has replica in any DC
if !targetDCs.HasAny(ring.Datacenters()...) {
continue
}
}
}

// Collect ring information
rings[keyspace] = ring
// Collect ring information
rings[keyspace+"."+tab] = ring

// Do not filter system_schema
if keyspace == systemSchema {
systemSchemaUnit.Tables = tables
} else {
f.Add(keyspace, tables)
// Do not filter system_schema
if keyspace == systemSchema {
systemSchemaUnit.Tables = append(systemSchemaUnit.Tables, tab)
} else {
f.Add(keyspace, []string{tab})
}
}
}

Expand Down Expand Up @@ -306,11 +309,13 @@ func (s *Service) getLiveNodes(ctx context.Context, client *scyllaclient.Client,
if len(liveNodes) < len(nodes) {
hosts := strset.New(liveNodes.Hosts()...)
for i := range target.Units {
r := rings[target.Units[i].Keyspace]
if r.Replication != scyllaclient.LocalStrategy {
for _, rt := range r.ReplicaTokens {
if !hosts.HasAny(rt.ReplicaSet...) {
return nil, errors.Errorf("not enough live nodes to backup keyspace %s", target.Units[i].Keyspace)
for _, t := range target.Units[i].Tables {
r := rings[target.Units[i].Keyspace+"."+t]
if r.Replication != scyllaclient.LocalStrategy {
for _, rt := range r.ReplicaTokens {
if !hosts.HasAny(rt.ReplicaSet...) {
return nil, errors.Errorf("not enough live nodes to backup %s.%s", target.Units[i].Keyspace, t)
}
}
}
}
Expand Down
Loading
Loading