Skip to content

Commit

Permalink
tests/e2e: add test cases related to HashKV
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Fu <[email protected]>
(cherry picked from commit 6f93af8)
Signed-off-by: Wei Fu <[email protected]>
  • Loading branch information
fuweid committed Aug 20, 2024
1 parent 21f6ad0 commit 0108fe7
Showing 1 changed file with 241 additions and 0 deletions.
241 changes: 241 additions & 0 deletions tests/e2e/hashkv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !cluster_proxy

package e2e

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/require"

"go.etcd.io/etcd/client/pkg/v3/fileutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)

func TestVerifyHashKVAfterCompact(t *testing.T) {
scenarios := []struct {
clusterVersion e2e.ClusterVersion
keys []string // used for data generators
}{
{
clusterVersion: e2e.CurrentVersion,
keys: []string{"key0"},
},
{
clusterVersion: e2e.CurrentVersion,
keys: []string{"key0", "key1"},
},
{
clusterVersion: e2e.QuorumLastVersion,
keys: []string{"key0"},
},
{
clusterVersion: e2e.QuorumLastVersion,
keys: []string{"key0", "key1"},
},
}

for _, compactedOnTombstoneRev := range []bool{false, true} {
for _, scenario := range scenarios {
t.Run(fmt.Sprintf("compactedOnTombstone=%v - %s - Keys=%v", compactedOnTombstoneRev, scenario.clusterVersion, scenario.keys), func(t *testing.T) {
e2e.BeforeTest(t)

if scenario.clusterVersion != e2e.CurrentVersion {
if !fileutil.Exist(e2e.BinPathLastRelease) {
t.Skipf("%q does not exist", e2e.BinPathLastRelease)
}
}

ctx := context.Background()

cfg := e2e.EtcdProcessClusterConfig{
ClusterSize: 3,
IsClientAutoTLS: true,
ClientTLS: e2e.ClientTLS,
Version: scenario.clusterVersion,
}

clus, err := e2e.NewEtcdProcessCluster(t, &cfg)
require.NoError(t, err)
defer clus.Close()

tombstoneRevs, lastestRev := populateDataForHashKV(t, clus, &cfg, scenario.keys)

compactedOnRev := tombstoneRevs[0]

// If compaction revision isn't a tombstone, select a revision in the middle of two tombstones.
if !compactedOnTombstoneRev {
compactedOnRev = (tombstoneRevs[0] + tombstoneRevs[1]) / 2
require.Greater(t, tombstoneRevs[1], compactedOnRev)
}

cli := newClient(t, clus.EndpointsGRPC(), cfg.ClientTLS, cfg.IsClientAutoTLS)
defer cli.Close()

t.Logf("COMPACT on rev=%d", compactedOnRev)
_, err = cli.Compact(ctx, compactedOnRev, clientv3.WithCompactPhysical())
require.NoError(t, err)

for rev := compactedOnRev; rev <= lastestRev; rev++ {
verifyConsistentHashKVAcrossAllMembers(t, cli, rev)
}
})
}
}
}

func TestVerifyHashKVAfterTwoCompactionsOnTombstone_MixVersions(t *testing.T) {
e2e.BeforeTest(t)

if !fileutil.Exist(e2e.BinPathLastRelease) {
t.Skipf("%q does not exist", e2e.BinPathLastRelease)
}

ctx := context.Background()

cfg := e2e.EtcdProcessClusterConfig{
ClusterSize: 3,
IsClientAutoTLS: true,
ClientTLS: e2e.ClientTLS,
Version: e2e.QuorumLastVersion,
}

clus, err := e2e.NewEtcdProcessCluster(t, &cfg)
require.NoError(t, err)
defer clus.Close()

t.Cleanup(func() { clus.Close() })

tombstoneRevs, lastestRev := populateDataForHashKV(t, clus, &cfg, []string{"key0"})

cli := newClient(t, clus.EndpointsGRPC(), cfg.ClientTLS, cfg.IsClientAutoTLS)
defer cli.Close()

firstCompactOnRev := tombstoneRevs[0]
t.Logf("COMPACT rev=%d", firstCompactOnRev)
_, err = cli.Compact(ctx, firstCompactOnRev, clientv3.WithCompactPhysical())
require.NoError(t, err)

secondCompactOnRev := tombstoneRevs[1]
t.Logf("COMPACT rev=%d", secondCompactOnRev)
_, err = cli.Compact(ctx, secondCompactOnRev, clientv3.WithCompactPhysical())
require.NoError(t, err)

for rev := secondCompactOnRev; rev <= lastestRev; rev++ {
verifyConsistentHashKVAcrossAllMembers(t, cli, rev)
}
}

func TestVerifyHashKVAfterCompactionOnLastTombstone_MixVersions(t *testing.T) {
e2e.BeforeTest(t)

if !fileutil.Exist(e2e.BinPathLastRelease) {
t.Skipf("%q does not exist", e2e.BinPathLastRelease)
}

for _, keys := range [][]string{
[]string{"key0"},
[]string{"key0", "key1"},
} {
t.Run(fmt.Sprintf("#%v", keys), func(t *testing.T) {
ctx := context.Background()

cfg := e2e.EtcdProcessClusterConfig{
ClusterSize: 3,
IsClientAutoTLS: true,
ClientTLS: e2e.ClientTLS,
Version: e2e.QuorumLastVersion,
}

clus, err := e2e.NewEtcdProcessCluster(t, &cfg)
require.NoError(t, err)
defer clus.Close()

tombstoneRevs, lastestRev := populateDataForHashKV(t, clus, &cfg, keys)

cli := newClient(t, clus.EndpointsGRPC(), cfg.ClientTLS, cfg.IsClientAutoTLS)
defer cli.Close()

compactOnRev := tombstoneRevs[len(tombstoneRevs)-1]
t.Logf("COMPACT rev=%d", compactOnRev)
_, err = cli.Compact(ctx, compactOnRev, clientv3.WithCompactPhysical())
require.NoError(t, err)

for rev := compactOnRev; rev <= lastestRev; rev++ {
verifyConsistentHashKVAcrossAllMembers(t, cli, rev)
}

})
}
}

// populateDataForHashKV populates some sample data, and return a slice of tombstone
// revisions and the latest revision
func populateDataForHashKV(t *testing.T, clus *e2e.EtcdProcessCluster, clusCfg *e2e.EtcdProcessClusterConfig, keys []string) ([]int64, int64) {
c := newClient(t, clus.EndpointsGRPC(), clusCfg.ClientTLS, clusCfg.IsClientAutoTLS)
defer c.Close()

ctx := context.Background()
totalOperations := 40

var (
tombStoneRevs []int64
latestRev int64
)

deleteStep := 10 // submit a delete operation on every 10 operations
for i := 1; i <= totalOperations; i++ {
if i%deleteStep == 0 {
t.Logf("Deleting key=%s", keys[0]) // Only delete the first key for simplicity
resp, derr := c.Delete(ctx, keys[0])
require.NoError(t, derr)
latestRev = resp.Header.Revision
tombStoneRevs = append(tombStoneRevs, resp.Header.Revision)
continue
}

value := fmt.Sprintf("%d", i)
var ops []clientv3.Op
for _, key := range keys {
ops = append(ops, clientv3.OpPut(key, value))
}

t.Logf("Writing keys: %v, value: %s", keys, value)
resp, terr := c.Txn(ctx).Then(ops...).Commit()
require.NoError(t, terr)
require.True(t, resp.Succeeded)
require.Len(t, resp.Responses, len(ops))
latestRev = resp.Header.Revision
}
return tombStoneRevs, latestRev
}

func verifyConsistentHashKVAcrossAllMembers(t *testing.T, cli *clientv3.Client, hashKVOnRev int64) {
t.Logf("HashKV on rev=%d", hashKVOnRev)
resp, err := hashKVs(cli.Endpoints(), cli)
require.NoError(t, err)

require.Greater(t, len(resp), 1)
require.True(t, resp[0].Hash != 0)
t.Logf("One Hash value is %d", resp[0].Hash)

for i := 1; i < len(resp); i++ {
require.Equal(t, resp[0].Hash, resp[i].Hash)
}
}

0 comments on commit 0108fe7

Please sign in to comment.