Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
limit healthchecking when we pass in keyspaces_to_watch (#161)
Browse files Browse the repository at this point in the history
Signed-off-by: Serry Park <[email protected]>

Signed-off-by: Serry Park <[email protected]>

Co-authored-by: Serry Park <[email protected]>
  • Loading branch information
setassociative and spark4 authored Jun 16, 2020
1 parent 067c033 commit 1bc2250
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 0 deletions.
56 changes: 56 additions & 0 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,3 +437,59 @@ func (fbs *FilterByShard) isIncluded(tablet *topodatapb.Tablet) bool {
}
return false
}

// FilterByKeyspace is a TabletRecorder filter that filters tablets by
// keyspace
type FilterByKeyspace struct {
tr TabletRecorder

keyspaces map[string]bool
}

// NewFilterByKeyspace creates a new FilterByKeyspace on top of an existing
// TabletRecorder. Each filter is a keyspace entry. All tablets that match
// a keyspace will be forwarded to the underlying TabletRecorder.
func NewFilterByKeyspace(tr TabletRecorder, selectedKeyspaces []string) *FilterByKeyspace {
m := make(map[string]bool)
for _, keyspace := range selectedKeyspaces {
m[keyspace] = true
}

return &FilterByKeyspace{
tr: tr,
keyspaces: m,
}
}

// AddTablet is part of the TabletRecorder interface.
func (fbk *FilterByKeyspace) AddTablet(tablet *topodatapb.Tablet, name string) {
if fbk.isIncluded(tablet) {
fbk.tr.AddTablet(tablet, name)
}
}

// RemoveTablet is part of the TabletRecorder interface.
func (fbk *FilterByKeyspace) RemoveTablet(tablet *topodatapb.Tablet) {
if fbk.isIncluded(tablet) {
fbk.tr.RemoveTablet(tablet)
}
}

// ReplaceTablet is part of the TabletRecorder interface.
func (fbk *FilterByKeyspace) ReplaceTablet(old *topodatapb.Tablet, new *topodatapb.Tablet, name string) {
if old.Keyspace != new.Keyspace {
log.Errorf("Error replacing old tablet in %v with new tablet in %v", old.Keyspace, new.Keyspace)
return
}

if fbk.isIncluded(new) {
fbk.tr.ReplaceTablet(old, new, name)
}
}

// isIncluded returns true if the tablet's keyspace should be
// forwarded to the underlying TabletRecorder.
func (fbk *FilterByKeyspace) isIncluded(tablet *topodatapb.Tablet) bool {
_, exist := fbk.keyspaces[tablet.Keyspace]
return exist
}
95 changes: 95 additions & 0 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package discovery

import (
"math/rand"
"testing"
"time"

Expand Down Expand Up @@ -410,3 +411,97 @@ func TestFilterByShard(t *testing.T) {
}
}
}

var (
testFilterByKeyspace = []struct {
keyspace string
expected bool
}{
{"ks1", true},
{"ks2", true},
{"ks3", false},
{"ks4", true},
{"ks5", true},
{"ks6", false},
{"ks7", false},
}
testKeyspacesToWatch = []string{"ks1", "ks2", "ks4", "ks5"}
testCell = "testCell"
testShard = "testShard"
testHostName = "testHostName"
)

func TestFilterByKeyspace(t *testing.T) {
hc := NewFakeHealthCheck()
tr := NewFilterByKeyspace(hc, testKeyspacesToWatch)
ts := memorytopo.NewServer(testCell)
tw := NewCellTabletsWatcher(context.Background(), ts, tr, testCell, 10*time.Minute, true, 5)

for _, test := range testFilterByKeyspace {
// Add a new tablet to the topology.
port := rand.Int31n(1000)
tablet := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: testCell,
Uid: rand.Uint32(),
},
Hostname: testHostName,
PortMap: map[string]int32{
"vt": port,
},
Keyspace: test.keyspace,
Shard: testShard,
}

got := tr.isIncluded(tablet)
if got != test.expected {
t.Errorf("isIncluded(%v) for keyspace %v returned %v but expected %v", test.keyspace, test.keyspace, got, test.expected)
}

if err := ts.CreateTablet(context.Background(), tablet); err != nil {
t.Errorf("CreateTablet failed: %v", err)
}

tw.loadTablets()
key := TabletToMapKey(tablet)
allTablets := hc.GetAllTablets()

if _, ok := allTablets[key]; ok != test.expected && proto.Equal(allTablets[key], tablet) != test.expected {
t.Errorf("Error adding tablet - got %v; want %v", ok, test.expected)
}

// Replace the tablet we added above
tabletReplacement := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: testCell,
Uid: rand.Uint32(),
},
Hostname: testHostName,
PortMap: map[string]int32{
"vt": port,
},
Keyspace: test.keyspace,
Shard: testShard,
}
got = tr.isIncluded(tabletReplacement)
if got != test.expected {
t.Errorf("isIncluded(%v) for keyspace %v returned %v but expected %v", test.keyspace, test.keyspace, got, test.expected)
}
if err := ts.CreateTablet(context.Background(), tabletReplacement); err != nil {
t.Errorf("CreateTablet failed: %v", err)
}

tw.loadTablets()
key = TabletToMapKey(tabletReplacement)
allTablets = hc.GetAllTablets()

if _, ok := allTablets[key]; ok != test.expected && proto.Equal(allTablets[key], tabletReplacement) != test.expected {
t.Errorf("Error replacing tablet - got %v; want %v", ok, test.expected)
}

// Delete the tablet
if err := ts.DeleteTablet(context.Background(), tabletReplacement.Alias); err != nil {
t.Fatalf("DeleteTablet failed: %v", err)
}
}
}
2 changes: 2 additions & 0 deletions go/vt/vtgate/gateway/discoverygateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ func createDiscoveryGateway(ctx context.Context, hc discovery.HealthCheck, serv
log.Exitf("Cannot parse tablet_filters parameter: %v", err)
}
tr = fbs
} else if len(KeyspacesToWatch) > 0 {
tr = discovery.NewFilterByKeyspace(dg.hc, KeyspacesToWatch)
}

ctw := discovery.NewCellTabletsWatcher(ctx, topoServer, tr, c, *refreshInterval, *refreshKnownTablets, *topoReadConcurrency)
Expand Down

0 comments on commit 1bc2250

Please sign in to comment.