Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backup: don't ignore uploadHost error #3731

Merged
merged 5 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions pkg/service/backup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,7 @@ func (s *Service) Backup(ctx context.Context, clusterID, taskID, runID uuid.UUID
Config: s.config,
Client: client,
},
PrevStage: run.Stage,
Metrics: s.metrics,
Units: run.Units,
OnRunProgress: s.putRunProgressLogError,
Expand All @@ -810,8 +811,10 @@ func (s *Service) Backup(ctx context.Context, clusterID, taskID, runID uuid.UUID
}
defer clusterSession.Close()

w.AwaitSchemaAgreement(ctx, clusterSession)

if err := w.AwaitSchemaAgreement(ctx, clusterSession); err != nil {
w.Logger.Info(ctx, "Couldn't await schema agreement, backup of schema as CQL files will be skipped", "error", err)
return nil
}
if err = w.DumpSchema(ctx, clusterSession); err != nil {
w.Logger.Info(ctx, "Couldn't dump schema, backup of schema as CQL files will be skipped", "error", err)
w.Schema = nil
Expand Down Expand Up @@ -848,7 +851,7 @@ func (s *Service) Backup(ctx context.Context, clusterID, taskID, runID uuid.UUID
prevStage := run.Stage

// Execute stages according to the stage order.
execStage := func(stage Stage, f func() error) error {
execStage := func(stage Stage, f func() error) (err error) {
// In purge only mode skip all stages before purge.
if target.PurgeOnly {
if stage.Index() < StagePurge.Index() {
Expand All @@ -874,6 +877,18 @@ func (s *Service) Backup(ctx context.Context, clusterID, taskID, runID uuid.UUID
// Always cleanup stats
defer w.cleanup(ctx, hi)

if desc, ok := stageDescription[stage]; ok {
up := strings.ToUpper(desc[:1]) + desc[1:]
w.Logger.Info(ctx, up+"...")
defer func(start time.Time) {
if err != nil {
w.Logger.Error(ctx, up+" failed see exact errors above", "duration", timeutc.Since(start))
} else {
w.Logger.Info(ctx, "Done "+desc, "duration", timeutc.Since(start))
}
}(timeutc.Now())
}

// Run function
return errors.Wrap(f(), strings.ReplaceAll(name, "_", " "))
}
Expand Down
110 changes: 89 additions & 21 deletions pkg/service/backup/service_backup_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,29 @@ func defaultConfig() backup.Config {
return c
}

func (h *backupTestHelper) setInterceptorBlockEndpointOnFirstHost(method string, path string) {
var (
brokenHost string
mu sync.Mutex
)
h.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
if req.Method == method && req.URL.Path == path {
mu.Lock()
defer mu.Unlock()

if brokenHost == "" {
h.T.Log("Setting broken host", req.Host)
brokenHost = req.Host
}

if brokenHost == req.Host {
return nil, errors.New("dial error")
}
}
return nil, nil
}))
}

func (h *backupTestHelper) listS3Files() (manifests, schemas, files []string) {
h.T.Helper()
opts := &scyllaclient.RcloneListDirOpts{
Expand Down Expand Up @@ -900,6 +923,28 @@ func TestBackupWithNodesDownIntegration(t *testing.T) {
}
}

func assertMaxProgress(t *testing.T, pr backup.Progress) {
msg := "expected all bytes to be uploaded"
if _, left := pr.ByteProgress(); left != 0 {
t.Fatal(msg, pr)
}
for _, hpr := range pr.Hosts {
if _, left := hpr.ByteProgress(); left != 0 {
t.Fatal(msg, hpr)
}
for _, kpr := range hpr.Keyspaces {
if _, left := kpr.ByteProgress(); left != 0 {
t.Fatal(msg, kpr)
}
for _, tpr := range kpr.Tables {
if _, left := tpr.ByteProgress(); left != 0 {
t.Fatal(msg, tpr)
}
}
}
}
}

var backupTimeout = 10 * time.Second

// Tests resuming a stopped backup.
Expand Down Expand Up @@ -1082,28 +1127,9 @@ func TestBackupResumeIntegration(t *testing.T) {
})

t.Run("resume after snapshot failed", func(t *testing.T) {
var (
h = newBackupTestHelper(t, session, config, location, nil)
brokenHost string
mu sync.Mutex
)

h := newBackupTestHelper(t, session, config, location, nil)
Print("Given: snapshot fails on a host")
h.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
if req.Method == http.MethodPost && req.URL.Path == "/storage_service/snapshots" {
mu.Lock()
if brokenHost == "" {
t.Log("Setting broken host", req.Host)
brokenHost = req.Host
}
mu.Unlock()

if brokenHost == req.Host {
return nil, errors.New("dial error on snapshot")
}
}
return nil, nil
}))
h.setInterceptorBlockEndpointOnFirstHost(http.MethodPost, "/storage_service/snapshots")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -1138,6 +1164,48 @@ func TestBackupResumeIntegration(t *testing.T) {
h.waitNoTransfers()
})

t.Run("resume after upload failed", func(t *testing.T) {
h := newBackupTestHelper(t, session, config, location, nil)
Print("Given: upload fails on a host")
h.setInterceptorBlockEndpointOnFirstHost(http.MethodPost, "/agent/rclone/job/progress")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if err := h.service.InitTarget(ctx, h.ClusterID, &target); err != nil {
t.Fatal(err)
}

Print("When: run backup")
err := h.service.Backup(ctx, h.ClusterID, h.TaskID, h.RunID, target)

Print("Then: it fails")
if err == nil {
t.Error("Expected error on run but got nil")
}
t.Log("Backup() error", err)

Print("Given: upload not longer fails on a host")
h.Hrt.SetInterceptor(nil)

Print("When: backup is resumed with new RunID")
runID := uuid.NewTime()
err = h.service.Backup(context.Background(), h.ClusterID, h.TaskID, runID, target)
if err != nil {
t.Error("Unexpected error", err)
}

Print("Then: data is uploaded")
pr, err := h.service.GetProgress(context.Background(), h.ClusterID, h.TaskID, runID)
if err != nil {
t.Error(err)
}
assertMaxProgress(t, pr)

Print("And: nothing is transferring")
h.waitNoTransfers()
})

t.Run("continue false", func(t *testing.T) {
var (
h = newBackupTestHelper(t, session, config, location, nil)
Expand Down
46 changes: 12 additions & 34 deletions pkg/service/backup/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ const (
StageDone Stage = "DONE"
)

var stageDescription = map[Stage]string{
StageInit: "initialising",
StageAwaitSchema: "awaiting schema agreement",
StageSnapshot: "taking snapshot",
StageIndex: "indexing snapshot files",
StageManifest: "uploading manifest files",
StageSchema: "uploading cql schema",
StageUpload: "uploading snapshot files",
StageMoveManifest: "moving manifest files",
StagePurge: "purging stale snapshots",
}

// StageOrder listing of all stages in the order of execution.
func StageOrder() []Stage {
return []Stage{
Expand Down Expand Up @@ -54,37 +66,3 @@ func (s Stage) Resumable() bool {
func (s Stage) Index() int {
return slice.Index(StageOrder(), s)
}

// RestoreStage specifies the restore worker stage.
type RestoreStage string

// RestoreStage enumeration.
const (
StageRestoreInit RestoreStage = "INIT"
StageRestoreDropViews RestoreStage = "DROP_VIEWS"
StageRestoreDisableTGC RestoreStage = "DISABLE_TGC"
StageRestoreData RestoreStage = "DATA"
StageRestoreRepair RestoreStage = "REPAIR"
StageRestoreEnableTGC RestoreStage = "ENABLE_TGC"
StageRestoreRecreateViews RestoreStage = "RECREATE_VIEWS"
StageRestoreDone RestoreStage = "DONE"
)

// RestoreStageOrder lists all restore stages in the order of their execution.
func RestoreStageOrder() []RestoreStage {
return []RestoreStage{
StageRestoreInit,
StageRestoreDropViews,
StageRestoreDisableTGC,
StageRestoreData,
StageRestoreRepair,
StageRestoreEnableTGC,
StageRestoreRecreateViews,
StageRestoreDone,
}
}

// Index returns stage position in RestoreStageOrder.
func (s RestoreStage) Index() int {
return slice.Index(RestoreStageOrder(), s)
}
1 change: 1 addition & 0 deletions pkg/service/backup/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type workerTools struct {
type worker struct {
workerTools

PrevStage Stage
Metrics metrics.BackupMetrics
Units []Unit
Schema *bytes.Buffer
Expand Down
14 changes: 3 additions & 11 deletions pkg/service/backup/worker_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,9 @@ import (
"github.com/scylladb/go-set/strset"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/util/timeutc"
)

func (w *worker) Index(ctx context.Context, hosts []hostInfo, limits []DCLimit) (err error) {
w.Logger.Info(ctx, "Indexing snapshot files...")
defer func(start time.Time) {
if err != nil {
w.Logger.Error(ctx, "Indexing snapshot files failed see exact errors above", "duration", timeutc.Since(start))
} else {
w.Logger.Info(ctx, "Done indexing snapshot files", "duration", timeutc.Since(start))
}
}(timeutc.Now())

f := func(h hostInfo) error {
w.Logger.Info(ctx, "Indexing snapshot files on host", "host", h.IP)

Expand Down Expand Up @@ -157,7 +147,9 @@ func (w *worker) indexSnapshotDirs(ctx context.Context, h hostInfo) ([]snapshotD
}
}

if len(dirs) == 0 {
// In case of reindexing, it's possible that all snapshot dirs
// were already uploaded and deleted in the previous run (#3733).
if w.PrevStage != StageUpload && len(dirs) == 0 {
return nil, errors.New("could not find any files")
}

Expand Down
20 changes: 0 additions & 20 deletions pkg/service/backup/worker_manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,14 @@ import (
"bytes"
"context"
"net/http"
"time"

"github.com/pkg/errors"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/util/parallel"
"github.com/scylladb/scylla-manager/v3/pkg/util/timeutc"
)

func (w *worker) UploadManifest(ctx context.Context, hosts []hostInfo) (stepError error) {
w.Logger.Info(ctx, "Uploading manifest files...")
defer func(start time.Time) {
if stepError != nil {
w.Logger.Error(ctx, "Uploading manifest files failed see exact errors above", "duration", timeutc.Since(start))
} else {
w.Logger.Info(ctx, "Done uploading manifest files", "duration", timeutc.Since(start))
}
}(timeutc.Now())

// Limit parallelism level, on huge clusters creating manifest content in
// memory for all nodes at the same time can lead to memory issues.
const maxParallel = 12
Expand Down Expand Up @@ -124,15 +113,6 @@ func (w *worker) uploadHostManifest(ctx context.Context, h hostInfo, m ManifestI
}

func (w *worker) MoveManifest(ctx context.Context, hosts []hostInfo) (err error) {
w.Logger.Info(ctx, "Moving manifest files...")
defer func(start time.Time) {
if err != nil {
w.Logger.Error(ctx, "Moving manifest files failed see exact errors above", "duration", timeutc.Since(start))
} else {
w.Logger.Info(ctx, "Done moving manifest files", "duration", timeutc.Since(start))
}
}(timeutc.Now())

rollbacks := make([]func(context.Context) error, len(hosts))

f := func(i int) (hostErr error) {
Expand Down
11 changes: 0 additions & 11 deletions pkg/service/backup/worker_purge.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,14 @@ package backup

import (
"context"
"time"

"github.com/pkg/errors"
"github.com/scylladb/go-log"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/util/parallel"
"github.com/scylladb/scylla-manager/v3/pkg/util/timeutc"
)

func (w *worker) Purge(ctx context.Context, hosts []hostInfo, retentionMap RetentionMap) (err error) {
w.Logger.Info(ctx, "Purging stale snapshots...")
defer func(start time.Time) {
if err != nil {
w.Logger.Error(ctx, "Purging stale snapshots failed see exact errors above", "duration", timeutc.Since(start))
} else {
w.Logger.Info(ctx, "Done purging stale snapshots", "duration", timeutc.Since(start))
}
}(timeutc.Now())

// List manifests in all locations
manifests, err := listManifestsInAllLocations(ctx, w.Client, hosts, w.ClusterID)
if err != nil {
Expand Down
Loading
Loading