Skip to content

Commit

Permalink
feat(restore_test): extend TestRestoreTablesPreparationIntegration wi…
Browse files Browse the repository at this point in the history
…th transfers

This way this test also checks transfers before and after backup.
It also checks transfers before, in the middle, when paused,
when resumed, and after restore.
  • Loading branch information
Michal-Leszczynski committed Oct 8, 2024
1 parent efdd2f0 commit 2962fa8
Showing 1 changed file with 65 additions and 24 deletions.
89 changes: 65 additions & 24 deletions pkg/service/restore/restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,22 +498,77 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
Print("Fill setup")
fillTable(t, h.srcCluster.rootSession, 100, ks, tab)

validateState := func(ch clusterHelper, tombstone string, compaction bool, transfers int) {
// Validate tombstone_gc mode
if got := tombstoneGCMode(t, ch.rootSession, ks, tab); tombstone != got {
t.Errorf("expected tombstone_gc=%s, got %s", tombstone, got)
}
// Validate compaction
for _, host := range ch.Client.Config().Hosts {
enabled, err := ch.Client.IsAutoCompactionEnabled(context.Background(), host, ks, tab)
if err != nil {
t.Fatal(errors.Wrapf(err, "check compaction on host %s", host))
}
if compaction != enabled {
t.Errorf("expected compaction enabled=%v, got=%v on host %s", compaction, enabled, host)
}
}
// Validate transfers
for _, host := range ch.Client.Config().Hosts {
got, err := ch.Client.RcloneGetTransfers(context.Background(), host)
if err != nil {
t.Fatal(errors.Wrapf(err, "check transfers on host %s", host))
}
if transfers != got {
t.Errorf("expected transfers=%d, got=%d on host %s", transfers, got, host)
}
}
}

shardCnt, err := h.dstCluster.Client.ShardCount(context.Background(), ManagedClusterHost())
if err != nil {
t.Fatal(err)
}
transfers0 := 2 * int(shardCnt)

// Set initial transfers
for _, host := range ManagedClusterHosts() {
err := h.dstCluster.Client.RcloneSetTransfers(context.Background(), host, 10)
if err != nil {
t.Fatal(errors.Wrapf(err, "set initial transfers on host %s", host))
}
}
for _, host := range ManagedSecondClusterHosts() {
err := h.srcCluster.Client.RcloneSetTransfers(context.Background(), host, 10)
if err != nil {
t.Fatal(errors.Wrapf(err, "set initial transfers on host %s", host))
}
}

Print("Validate state before backup")
validateState(h.srcCluster, "repair", true, 10)

Print("Run backup")
loc := []Location{testLocation("preparation", "")}
S3InitBucket(t, loc[0].Path)
ksFilter := []string{ks}
tag := h.runBackup(t, map[string]any{
"location": loc,
"keyspace": ksFilter,
"location": loc,
"keyspace": ksFilter,
"transfers": 3,
})

Print("Validate state after backup")
validateState(h.srcCluster, "repair", true, 3)

runRestore := func(ctx context.Context, finishedRestore chan error) {
grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser)
h.dstCluster.RunID = uuid.NewTime()
rawProps, err := json.Marshal(map[string]any{
"location": loc,
"keyspace": ksFilter,
"snapshot_tag": tag,
"transfers": 0,
"restore_tables": true,
})
if err != nil {
Expand All @@ -522,23 +577,6 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
finishedRestore <- h.dstRestoreSvc.Restore(ctx, h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID, rawProps)
}

validateState := func(tombstone string, compaction bool) {
// Validate tombstone_gc mode
if got := tombstoneGCMode(t, h.dstCluster.rootSession, ks, tab); tombstone != got {
t.Errorf("expected tombstone_gc=%s, got %s", tombstone, got)
}
// Validate compaction
for _, host := range ManagedClusterHosts() {
enabled, err := h.dstCluster.Client.IsAutoCompactionEnabled(context.Background(), host, ks, tab)
if err != nil {
t.Fatal(errors.Wrapf(err, "check compaction on host %s", host))
}
if compaction != enabled {
t.Errorf("expected compaction enabled=%v, got=%v on host %s", compaction, enabled, host)
}
}
}

makeCopyPathsHang := func(reachedDataStage *atomic.Bool, reachedDataStageChan, hangCopyPaths chan struct{}) {
h.dstCluster.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
if strings.HasPrefix(req.URL.Path, "/agent/rclone/sync/copypaths") {
Expand All @@ -561,6 +599,9 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
Print("Make copy paths hang")
makeCopyPathsHang(reachedDataStage, reachedDataStageChan, hangCopyPathsChan)

Print("Validate state before restore")
validateState(h.dstCluster, "repair", true, 10)

Print("Run restore")
finishedRestore := make(chan error)
restoreCtx, restoreCancel := context.WithCancel(context.Background())
Expand All @@ -570,7 +611,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
<-reachedDataStageChan

Print("Validate state during restore data")
validateState("disabled", false)
validateState(h.dstCluster, "disabled", false, transfers0)

Print("Pause restore")
restoreCancel()
Expand All @@ -579,13 +620,13 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
close(hangCopyPathsChan)

Print("Wait for restore")
err := <-finishedRestore
err = <-finishedRestore
if !errors.Is(err, context.Canceled) {
t.Fatalf("Expected restore to be paused, got: %s", err)
}

Print("Validate state during pause")
validateState("disabled", true)
validateState(h.dstCluster, "disabled", true, transfers0)

reachedDataStage = &atomic.Bool{}
reachedDataStageChan = make(chan struct{})
Expand All @@ -601,7 +642,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
<-reachedDataStageChan

Print("Validate state during restore data after pause")
validateState("disabled", false)
validateState(h.dstCluster, "disabled", false, transfers0)

Print("Release copy paths")
close(hangCopyPathsChan)
Expand All @@ -613,7 +654,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
}

Print("Validate state after restore success")
validateState("repair", true)
validateState(h.dstCluster, "repair", true, transfers0)

Print("Validate table contents")
h.validateIdenticalTables(t, []table{{ks: ks, tab: tab}})
Expand Down

0 comments on commit 2962fa8

Please sign in to comment.