Skip to content

Commit

Permalink
feat(restore_test): test setup during restore data stage
Browse files Browse the repository at this point in the history
This commit add TestRestoreTablesPreparationIntegration.
Its purpose is to check setup changes performed by SM during
restore data stage. Right now it validates tombstone_gc mode
and compaction, but it can be easily extended in the future.
Test scenario:
- Run restore - hang on restore data stage
- Validate setup
- Pause restore
- Validate setup
- Resume restore - hang on restore data stage
- Validate setup
- Resume restore - wait for success
- Validate setup
- Validate restore success
  • Loading branch information
Michal-Leszczynski committed Oct 8, 2024
1 parent ed4c23b commit 85be062
Showing 1 changed file with 153 additions and 0 deletions.
153 changes: 153 additions & 0 deletions pkg/service/restore/restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync/atomic"
"testing"
"time"

Expand All @@ -19,6 +21,7 @@ import (
. "github.com/scylladb/scylla-manager/v3/pkg/testutils"
. "github.com/scylladb/scylla-manager/v3/pkg/testutils/db"
. "github.com/scylladb/scylla-manager/v3/pkg/testutils/testconfig"
"github.com/scylladb/scylla-manager/v3/pkg/util/httpx"
"github.com/scylladb/scylla-manager/v3/pkg/util/maputil"
"github.com/scylladb/scylla-manager/v3/pkg/util/query"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
Expand Down Expand Up @@ -465,3 +468,153 @@ func TestRestoreTablesPausedIntegration(t *testing.T) {
}
}
}

func TestRestoreTablesPreparationIntegration(t *testing.T) {
// Scenario - setup corresponds to things like tombstone_gc mode or compaction being enabled:
// Run restore - hang on restore data stage
// Validate setup
// Pause restore
// Validate setup
// Resume restore - hang on restore data stage
// Validate setup
// Resume restore - wait for success
// Validate setup
// Validate restore success

h := newTestHelper(t, ManagedSecondClusterHosts(), ManagedClusterHosts())

Print("Keyspace setup")
ksStmt := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': %d}"
ks := randomizedName("prep_")
ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(ksStmt, ks, 2))
ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(ksStmt, ks, 2))

Print("Table setup")
tabStmt := "CREATE TABLE %q.%q (id int PRIMARY KEY, data int) WITH tombstone_gc = {'mode': 'repair'}"
tab := randomizedName("tab_")
ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab))
ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab))

Print("Fill setup")
fillTable(t, h.srcCluster.rootSession, 100, ks, tab)

Print("Run backup")
loc := []Location{testLocation("preparation", "")}
S3InitBucket(t, loc[0].Path)
ksFilter := []string{ks}
tag := h.runBackup(t, map[string]any{
"location": loc,
"keyspace": ksFilter,
})

runRestore := func(ctx context.Context, finishedRestore chan error) {
grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser)
h.dstCluster.RunID = uuid.NewTime()
rawProps, err := json.Marshal(map[string]any{
"location": loc,
"keyspace": ksFilter,
"snapshot_tag": tag,
"restore_tables": true,
})
if err != nil {
t.Error(err)
}
finishedRestore <- h.dstRestoreSvc.Restore(ctx, h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID, rawProps)
}

validateState := func(tombstone string, compaction bool) {
// Validate tombstone_gc mode
if got := tombstoneGCMode(t, h.dstCluster.rootSession, ks, tab); tombstone != got {
t.Errorf("expected tombstone_gc=%s, got %s", tombstone, got)
}
// Validate compaction
for _, host := range ManagedClusterHosts() {
enabled, err := h.dstCluster.Client.IsAutoCompactionEnabled(context.Background(), host, ks, tab)
if err != nil {
t.Fatal(errors.Wrapf(err, "check compaction on host %s", host))
}
if compaction != enabled {
t.Errorf("expected compaction enabled=%v, got=%v on host %s", compaction, enabled, host)
}
}
}

makeCopyPathsHang := func(reachedDataStage *atomic.Bool, reachedDataStageChan, hangCopyPaths chan struct{}) {
h.dstCluster.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
if strings.HasPrefix(req.URL.Path, "/agent/rclone/sync/copypaths") {
if reachedDataStage.CompareAndSwap(false, true) {
Print("Reached data stage")
close(reachedDataStageChan)
}
Print("Wait for copy paths to stop hanging")
<-hangCopyPaths
}
return nil, nil
}))
}

var (
reachedDataStage = &atomic.Bool{}
reachedDataStageChan = make(chan struct{}, 1)
hangCopyPathsChan = make(chan struct{}, 1)
)
Print("Make copy paths hang")
makeCopyPathsHang(reachedDataStage, reachedDataStageChan, hangCopyPathsChan)

Print("Run restore")
finishedRestore := make(chan error, 1)
restoreCtx, restoreCancel := context.WithCancel(context.Background())
go runRestore(restoreCtx, finishedRestore)

Print("Wait for data stage")
<-reachedDataStageChan

Print("Validate state during restore data")
validateState("disabled", false)

Print("Pause restore")
restoreCancel()

Print("Release copy paths")
close(hangCopyPathsChan)

Print("Wait for restore")
err := <-finishedRestore
if !errors.Is(err, context.Canceled) {
t.Fatalf("Expected restore to be paused, got: %s", err)
}

Print("Validate state during pause")
validateState("disabled", true)

reachedDataStage = &atomic.Bool{}
reachedDataStageChan = make(chan struct{}, 1)
hangCopyPathsChan = make(chan struct{}, 1)
Print("Make copy paths hang after pause")
makeCopyPathsHang(reachedDataStage, reachedDataStageChan, hangCopyPathsChan)

Print("Run restore after pause")
finishedRestore = make(chan error, 1)
go runRestore(context.Background(), finishedRestore)

Print("Wait for data stage")
<-reachedDataStageChan

Print("Validate state during restore data after pause")
validateState("disabled", false)

Print("Release copy paths")
close(hangCopyPathsChan)

Print("Wait for restore")
err = <-finishedRestore
if err != nil {
t.Fatalf("Expected restore to success, got: %s", err)
}

Print("Validate state after restore success")
validateState("repair", true)

Print("Validate table contents")
h.validateIdenticalTables(t, []table{{ks: ks, tab: tab}})
}

0 comments on commit 85be062

Please sign in to comment.