Skip to content

Commit

Permalink
Implement shared cache.
Browse files Browse the repository at this point in the history
  • Loading branch information
zyjhtangtang committed Aug 28, 2024
1 parent fbfa01f commit e49d50f
Show file tree
Hide file tree
Showing 16 changed files with 1,454 additions and 10 deletions.
16 changes: 8 additions & 8 deletions pkg/yurthub/filter/servicetopology/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"context"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
discoveryv1 "k8s.io/api/discovery/v1"
discoveryV1beta1 "k8s.io/api/discovery/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -130,7 +130,7 @@ func (stf *serviceTopologyFilter) Filter(obj runtime.Object, stopCh <-chan struc
}

switch v := obj.(type) {
case *v1.Endpoints, *discoveryV1beta1.EndpointSlice, *discovery.EndpointSlice:
case *v1.Endpoints, *discoveryV1beta1.EndpointSlice, *discoveryv1.EndpointSlice:
return stf.serviceTopologyHandler(v)
default:
return obj
Expand Down Expand Up @@ -164,9 +164,9 @@ func (stf *serviceTopologyFilter) resolveServiceTopologyType(obj runtime.Object)
case *discoveryV1beta1.EndpointSlice:
svcNamespace = v.Namespace
svcName = v.Labels[discoveryV1beta1.LabelServiceName]
case *discovery.EndpointSlice:
case *discoveryv1.EndpointSlice:
svcNamespace = v.Namespace
svcName = v.Labels[discovery.LabelServiceName]
svcName = v.Labels[discoveryv1.LabelServiceName]
case *v1.Endpoints:
svcNamespace = v.Namespace
svcName = v.Name
Expand All @@ -190,7 +190,7 @@ func (stf *serviceTopologyFilter) nodeTopologyHandler(obj runtime.Object) runtim
switch v := obj.(type) {
case *discoveryV1beta1.EndpointSlice:
return reassembleV1beta1EndpointSlice(v, stf.nodeName, nil)
case *discovery.EndpointSlice:
case *discoveryv1.EndpointSlice:
return reassembleEndpointSlice(v, stf.nodeName, nil)
case *v1.Endpoints:
return reassembleEndpoints(v, stf.nodeName, nil)
Expand All @@ -215,7 +215,7 @@ func (stf *serviceTopologyFilter) nodePoolTopologyHandler(obj runtime.Object) ru
switch v := obj.(type) {
case *discoveryV1beta1.EndpointSlice:
return reassembleV1beta1EndpointSlice(v, "", nodes)
case *discovery.EndpointSlice:
case *discoveryv1.EndpointSlice:
return reassembleEndpointSlice(v, "", nodes)
case *v1.Endpoints:
return reassembleEndpoints(v, "", nodes)
Expand Down Expand Up @@ -252,13 +252,13 @@ func reassembleV1beta1EndpointSlice(endpointSlice *discoveryV1beta1.EndpointSlic
}

// reassembleEndpointSlice will discard endpoints that are not on the same node/nodePool for v1.EndpointSlice
func reassembleEndpointSlice(endpointSlice *discovery.EndpointSlice, nodeName string, nodes []string) *discovery.EndpointSlice {
func reassembleEndpointSlice(endpointSlice *discoveryv1.EndpointSlice, nodeName string, nodes []string) *discoveryv1.EndpointSlice {
if len(nodeName) != 0 && len(nodes) != 0 {
klog.Warningf("reassembleEndpointSlice: nodeName(%s) and nodePool can not be set at the same time", nodeName)
return endpointSlice
}

var newEps []discovery.Endpoint
var newEps []discoveryv1.Endpoint
for i := range endpointSlice.Endpoints {
if len(nodeName) != 0 {
if *endpointSlice.Endpoints[i].NodeName == nodeName {
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func RunYurtHubServers(cfg *config.YurtHubConfiguration,
proxyHandler http.Handler,
rest *rest.RestConfigManager,
stopCh <-chan struct{}) error {

hubServerHandler := mux.NewRouter()
registerHandlers(hubServerHandler, cfg, rest)

Expand Down Expand Up @@ -72,7 +73,6 @@ func RunYurtHubServers(cfg *config.YurtHubConfiguration,
return err
}
}

return nil
}

Expand Down
74 changes: 74 additions & 0 deletions pkg/yurthub/sharecache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright 2024 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sharecache

import (
"context"
"fmt"
"sync"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
kstorage "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher"
"k8s.io/client-go/kubernetes/scheme"
)

type Interface interface {
Watch(ctx context.Context, key string, opts kstorage.ListOptions) (watch.Interface, error)
GetList(ctx context.Context, key string, opts kstorage.ListOptions, listObj runtime.Object) error
}

type ResourceCacheConfig struct {
KeyFunc func(runtime.Object) (string, error)
NewFunc func() runtime.Object
NewListFunc func() runtime.Object
GetAttrsFunc kstorage.AttrFunc
NamespaceScoped bool
}

func NewResourceCache(
s kstorage.Interface,
resource *schema.GroupVersionResource,
config *ResourceCacheConfig) (Interface, func(), error) {

cacheConfig := cacher.Config{
Storage: s,
Versioner: kstorage.APIObjectVersioner{},
GroupResource: resource.GroupResource(),
KeyFunc: config.KeyFunc,
NewFunc: config.NewFunc,
NewListFunc: config.NewListFunc,
GetAttrsFunc: config.GetAttrsFunc,
Codec: scheme.Codecs.LegacyCodec(resource.GroupVersion()),
}

cacher, err := cacher.NewCacherFromConfig(cacheConfig)
if err != nil {
return nil, func() {}, fmt.Errorf("failed to new cacher from config, error: %v", err)

Check warning on line 63 in pkg/yurthub/sharecache/cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/sharecache/cache.go#L63

Added line #L63 was not covered by tests
}

var once sync.Once
destroyFunc := func() {
once.Do(func() {
cacher.Stop()
})

Check warning on line 70 in pkg/yurthub/sharecache/cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/sharecache/cache.go#L68-L70

Added lines #L68 - L70 were not covered by tests
}

return cacher, destroyFunc, nil
}
135 changes: 135 additions & 0 deletions pkg/yurthub/sharecache/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
Copyright 2024 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sharecache

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/storage"

ystorage "github.com/openyurtio/openyurt/pkg/yurthub/sharecache/storage"
)

var serviceKeyFunc = func(obj runtime.Object) (string, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return "", err
}

return registry.NamespaceKeyFunc(request.WithNamespace(request.NewContext(), accessor.GetNamespace()), "", accessor.GetName())
}

var newServiceFunc = func() runtime.Object {
return &v1.Service{}
}

var newServiceListFunc = func() runtime.Object {
return &v1.ServiceList{}
}

func TestResourceCache_GetList(t *testing.T) {
cache, _, err := NewResourceCache(
ystorage.NewFakeServiceStorage(),
serviceGVR,
&ResourceCacheConfig{
serviceKeyFunc,
newServiceFunc,
newServiceListFunc,
storage.DefaultNamespaceScopedAttr,
true,
},
)

assert.Nil(t, err)
assertCacheGetList(t, cache)
}

func mockListOptions() storage.ListOptions {
return storage.ListOptions{
ResourceVersion: "100",
Recursive: true,
Predicate: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
},
}
}

func assertCacheGetList(t testing.TB, cache Interface) {
t.Helper()

serviceList := &v1.ServiceList{}
err := cache.GetList(context.Background(), "", mockListOptions(), serviceList)

assert.Nil(t, err)
assert.Equal(t, 1, len(serviceList.Items))
}

func TestResourceCache_Watch(t *testing.T) {
fakeStorage := ystorage.NewFakeServiceStorage()

cache, _, err := NewResourceCache(
fakeStorage,
serviceGVR,
&ResourceCacheConfig{
serviceKeyFunc,
newServiceFunc,
newServiceListFunc,
storage.DefaultNamespaceScopedAttr,
true,
},
)

assert.Nil(t, err)
assertCacheWatch(t, cache, fakeStorage)
}

func mockWatchOptions() storage.ListOptions {
var sendInitialEvents = true

return storage.ListOptions{
ResourceVersion: "100",
Predicate: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
},
Recursive: true,
SendInitialEvents: &sendInitialEvents,
}
}

func assertCacheWatch(t testing.TB, cache Interface, fs *ystorage.FakeServiceStorage) {
receive, err := cache.Watch(context.TODO(), "", mockWatchOptions())

go func() {
fs.ModifyWatchObject()
}()

assert.Nil(t, err)
event := <-receive.ResultChan()
assert.Equal(t, watch.Modified, event.Type)
}
83 changes: 83 additions & 0 deletions pkg/yurthub/sharecache/fake_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
Copyright 2024 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sharecache

import (
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/storage"

ystorage "github.com/openyurtio/openyurt/pkg/yurthub/sharecache/storage"
)

var serviceGVR = &schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "services",
}

var endpointSliceGVR = &schema.GroupVersionResource{
Group: "discovery.k8s.io",
Version: "v1",
Resource: "endpointslices",
}

type FakeCacheManager struct {
cacheMap map[string]Interface
resourceConfigMap map[string]*ResourceCacheConfig
}

func NewFakeCacheManager() *FakeCacheManager {
return &FakeCacheManager{
map[string]Interface{
serviceGVR.String(): ystorage.NewFakeServiceStorage(),
endpointSliceGVR.String(): ystorage.NewFakeEndpointSliceStorage(),
},
map[string]*ResourceCacheConfig{
serviceGVR.String(): {
KeyFunc: NamespaceKeyFunc,
NewListFunc: func() runtime.Object {
return &v1.ServiceList{}
},
NewFunc: func() runtime.Object {
return &v1.Service{}
},

Check warning on line 60 in pkg/yurthub/sharecache/fake_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/sharecache/fake_manager.go#L46-L60

Added lines #L46 - L60 were not covered by tests
GetAttrsFunc: storage.DefaultNamespaceScopedAttr,
},
endpointSliceGVR.String(): {
KeyFunc: NamespaceKeyFunc,
NewListFunc: func() runtime.Object {
return &discovery.EndpointSliceList{}
},
NewFunc: func() runtime.Object {
return &discovery.EndpointSlice{}
},

Check warning on line 70 in pkg/yurthub/sharecache/fake_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/sharecache/fake_manager.go#L65-L70

Added lines #L65 - L70 were not covered by tests
GetAttrsFunc: storage.DefaultNamespaceScopedAttr,
},
},
}
}

func (fcm *FakeCacheManager) ResourceCacheConfig(gvr *schema.GroupVersionResource) (*ResourceCacheConfig, error) {
return fcm.resourceConfigMap[gvr.String()], nil

Check warning on line 78 in pkg/yurthub/sharecache/fake_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/sharecache/fake_manager.go#L77-L78

Added lines #L77 - L78 were not covered by tests
}

func (fcm *FakeCacheManager) ResourceCache(gvr *schema.GroupVersionResource) (Interface, func(), error) {
return fcm.cacheMap[gvr.String()], nil, nil

Check warning on line 82 in pkg/yurthub/sharecache/fake_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/sharecache/fake_manager.go#L81-L82

Added lines #L81 - L82 were not covered by tests
}
Loading

0 comments on commit e49d50f

Please sign in to comment.