diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index fdf14f11a19..b623457663c 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -437,3 +437,59 @@ func (fbs *FilterByShard) isIncluded(tablet *topodatapb.Tablet) bool { } return false } + +// FilterByKeyspace is a TabletRecorder filter that filters tablets by +// keyspace +type FilterByKeyspace struct { + tr TabletRecorder + + keyspaces map[string]bool +} + +// NewFilterByKeyspace creates a new FilterByKeyspace on top of an existing +// TabletRecorder. Each filter is a keyspace entry. All tablets that match +// a keyspace will be forwarded to the underlying TabletRecorder. +func NewFilterByKeyspace(tr TabletRecorder, selectedKeyspaces []string) *FilterByKeyspace { + m := make(map[string]bool) + for _, keyspace := range selectedKeyspaces { + m[keyspace] = true + } + + return &FilterByKeyspace{ + tr: tr, + keyspaces: m, + } +} + +// AddTablet is part of the TabletRecorder interface. +func (fbk *FilterByKeyspace) AddTablet(tablet *topodatapb.Tablet, name string) { + if fbk.isIncluded(tablet) { + fbk.tr.AddTablet(tablet, name) + } +} + +// RemoveTablet is part of the TabletRecorder interface. +func (fbk *FilterByKeyspace) RemoveTablet(tablet *topodatapb.Tablet) { + if fbk.isIncluded(tablet) { + fbk.tr.RemoveTablet(tablet) + } +} + +// ReplaceTablet is part of the TabletRecorder interface. +func (fbk *FilterByKeyspace) ReplaceTablet(old *topodatapb.Tablet, new *topodatapb.Tablet, name string) { + if old.Keyspace != new.Keyspace { + log.Errorf("Error replacing old tablet in %v with new tablet in %v", old.Keyspace, new.Keyspace) + return + } + + if fbk.isIncluded(new) { + fbk.tr.ReplaceTablet(old, new, name) + } +} + +// isIncluded returns true if the tablet's keyspace should be +// forwarded to the underlying TabletRecorder. +func (fbk *FilterByKeyspace) isIncluded(tablet *topodatapb.Tablet) bool { + _, exist := fbk.keyspaces[tablet.Keyspace] + return exist +} diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index c3cd553699a..02c664fa15b 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -17,6 +17,7 @@ limitations under the License. package discovery import ( + "math/rand" "testing" "time" @@ -410,3 +411,97 @@ func TestFilterByShard(t *testing.T) { } } } + +var ( + testFilterByKeyspace = []struct { + keyspace string + expected bool + }{ + {"ks1", true}, + {"ks2", true}, + {"ks3", false}, + {"ks4", true}, + {"ks5", true}, + {"ks6", false}, + {"ks7", false}, + } + testKeyspacesToWatch = []string{"ks1", "ks2", "ks4", "ks5"} + testCell = "testCell" + testShard = "testShard" + testHostName = "testHostName" +) + +func TestFilterByKeyspace(t *testing.T) { + hc := NewFakeHealthCheck() + tr := NewFilterByKeyspace(hc, testKeyspacesToWatch) + ts := memorytopo.NewServer(testCell) + tw := NewCellTabletsWatcher(context.Background(), ts, tr, testCell, 10*time.Minute, true, 5) + + for _, test := range testFilterByKeyspace { + // Add a new tablet to the topology. + port := rand.Int31n(1000) + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: rand.Uint32(), + }, + Hostname: testHostName, + PortMap: map[string]int32{ + "vt": port, + }, + Keyspace: test.keyspace, + Shard: testShard, + } + + got := tr.isIncluded(tablet) + if got != test.expected { + t.Errorf("isIncluded(%v) for keyspace %v returned %v but expected %v", test.keyspace, test.keyspace, got, test.expected) + } + + if err := ts.CreateTablet(context.Background(), tablet); err != nil { + t.Errorf("CreateTablet failed: %v", err) + } + + tw.loadTablets() + key := TabletToMapKey(tablet) + allTablets := hc.GetAllTablets() + + if _, ok := allTablets[key]; ok != test.expected && proto.Equal(allTablets[key], tablet) != test.expected { + t.Errorf("Error adding tablet - got %v; want %v", ok, test.expected) + } + + // Replace the tablet we added above + tabletReplacement := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: rand.Uint32(), + }, + Hostname: testHostName, + PortMap: map[string]int32{ + "vt": port, + }, + Keyspace: test.keyspace, + Shard: testShard, + } + got = tr.isIncluded(tabletReplacement) + if got != test.expected { + t.Errorf("isIncluded(%v) for keyspace %v returned %v but expected %v", test.keyspace, test.keyspace, got, test.expected) + } + if err := ts.CreateTablet(context.Background(), tabletReplacement); err != nil { + t.Errorf("CreateTablet failed: %v", err) + } + + tw.loadTablets() + key = TabletToMapKey(tabletReplacement) + allTablets = hc.GetAllTablets() + + if _, ok := allTablets[key]; ok != test.expected && proto.Equal(allTablets[key], tabletReplacement) != test.expected { + t.Errorf("Error replacing tablet - got %v; want %v", ok, test.expected) + } + + // Delete the tablet + if err := ts.DeleteTablet(context.Background(), tabletReplacement.Alias); err != nil { + t.Fatalf("DeleteTablet failed: %v", err) + } + } +} diff --git a/go/vt/vtgate/gateway/discoverygateway.go b/go/vt/vtgate/gateway/discoverygateway.go index 68a7ee84de6..31e0c1d4300 100644 --- a/go/vt/vtgate/gateway/discoverygateway.go +++ b/go/vt/vtgate/gateway/discoverygateway.go @@ -127,6 +127,8 @@ func createDiscoveryGateway(ctx context.Context, hc discovery.HealthCheck, serv log.Exitf("Cannot parse tablet_filters parameter: %v", err) } tr = fbs + } else if len(KeyspacesToWatch) > 0 { + tr = discovery.NewFilterByKeyspace(dg.hc, KeyspacesToWatch) } ctw := discovery.NewCellTabletsWatcher(ctx, topoServer, tr, c, *refreshInterval, *refreshKnownTablets, *topoReadConcurrency)