Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement multiplexer proxy. #2141

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

zyjhtangtang
Copy link
Contributor

What type of PR is this?

/kind feature

What this PR does / why we need it:

Implement the shared cache module in node-level traffic multiplexing, referencing the Design Proposal.

Does this PR introduce a user-facing change?

"NONE"

Copy link

codecov bot commented Aug 28, 2024

Codecov Report

Attention: Patch coverage is 48.54142% with 441 lines in your changes missing coverage. Please review.

Project coverage is 45.28%. Comparing base (7763e7c) to head (a3d603c).
Report is 45 commits behind head on master.

Files with missing lines Patch % Lines
.../yurthub/proxy/multiplexer/watchembeddedencoder.go 10.66% 130 Missing and 4 partials ⚠️
pkg/yurthub/proxy/multiplexer/multiplexerwatch.go 51.21% 43 Missing and 17 partials ⚠️
pkg/yurthub/multiplexer/storage/fake_storage.go 0.00% 49 Missing ⚠️
pkg/yurthub/proxy/multiplexer/multiplexerlist.go 51.42% 19 Missing and 15 partials ⚠️
pkg/yurthub/proxy/multiplexer/multiplexerproxy.go 72.95% 21 Missing and 12 partials ⚠️
pkg/yurthub/multiplexer/manager.go 73.33% 10 Missing and 10 partials ⚠️
pkg/yurthub/proxy/util/util.go 0.00% 19 Missing ⚠️
pkg/yurthub/proxy/proxy.go 0.00% 14 Missing ⚠️
pkg/yurthub/filter/interfaces.go 0.00% 13 Missing ⚠️
pkg/yurthub/filter/manager/manager.go 0.00% 10 Missing ⚠️
... and 12 more
Additional details and impacted files
@@             Coverage Diff             @@
##           master    #2141       +/-   ##
===========================================
- Coverage   58.93%   45.28%   -13.65%     
===========================================
  Files         210      420      +210     
  Lines       18968    28631     +9663     
===========================================
+ Hits        11179    12966     +1787     
- Misses       6707    14367     +7660     
- Partials     1082     1298      +216     
Flag Coverage Δ
unittests 45.28% <48.54%> (-13.65%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@zyjhtangtang zyjhtangtang force-pushed the master branch 3 times, most recently from 44acb84 to 8ecdab1 Compare September 14, 2024 07:49
Copy link

@zyjhtangtang zyjhtangtang force-pushed the master branch 2 times, most recently from 4171c61 to aed7925 Compare October 21, 2024 07:32
@zyjhtangtang zyjhtangtang changed the title Implement shared cache. Implement multiplexer proxy. Nov 13, 2024
@zyjhtangtang zyjhtangtang force-pushed the master branch 3 times, most recently from 4f1ad5f to d6c4a47 Compare November 14, 2024 01:34
@@ -86,13 +86,15 @@ const (
CacheUserAgentsKey = "cache_agents"
PoolScopeResourcesKey = "pool_scope_resources"

MultiplexerProxyClientUserAgent = "multiplexer-proxy"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about name MultiplexerProxyClientUserAgent to fmt.Sprintf("multiplexer-proxy-%s", strings.ToLower(nodeName))?

Copy link
Contributor Author

@zyjhtangtang zyjhtangtang Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

func (sm *storageManager) restClient(gvr *schema.GroupVersionResource) (rest.Interface, error) {
httpClient, _ := rest.HTTPClientFor(sm.config)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not skip err here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return nil, errors.Wrapf(err, "failed to get rest client for %v", gvr)
}

rs := &store{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rs := NewStore(restClient, gvr.Resource)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@zyjhtangtang zyjhtangtang force-pushed the master branch 2 times, most recently from a2f0237 to a3d603c Compare December 4, 2024 09:08
"github.com/openyurtio/openyurt/pkg/yurthub/network"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

var DefaultMultiplexerResources = []schema.GroupVersionResource{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DefaultMultiplexerResources --> AllowedMultiplexerResources

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -101,6 +117,9 @@ type YurtHubConfiguration struct {
CoordinatorClient kubernetes.Interface
LeaderElection componentbaseconfig.LeaderElectionConfiguration
HostControlPlaneAddr string // ip:port
PostStartHooks map[string]func() error
MultiplexerCacheManager multiplexer.MultiplexerManager
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MultiplexerCacheManager --> RequestMultiplexerManager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

type NodeGetter func(name string) (*v1.Node, error)

type UnionObjectFilter []ObjectFilter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we can use filterChain instead of defining a new UnionObjectFilter.

type filterChain []filter.ObjectFilter

by the way, maybe you can create a new folder named objectfilter and move filterchain into this folder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zyjhtangtang please remove UnionObjectFilter

@@ -59,4 +59,11 @@ type ObjectFilter interface {
Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object
}

type FilterManager interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about rename FilterManager to FilterFinder?


if info.Verb != "list" && info.Verb != "watch" {
return false
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also need to exclude requests with fieldSelector or labelSelector.

sp.multiplexerList(w, r, gvr)
case "watch":
sp.multiplexerWatch(w, r, gvr)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add default case, like only support list/watch requests.


type storageManager struct {
config *rest.Config
storageMap map[string]storage.Interface
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

storageMap --> gvrToStorage

restStoreManager: restStoreManager,
restMapper: kmeta.NewDefaultRESTMapperFromScheme(),
cacheMap: make(map[string]Interface),
cacheConfigMap: make(map[string]*ResourceCacheConfig),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cacheConfigMap --> gvrToCacheConfig

return &multiplexerManager{
restStoreManager: restStoreManager,
restMapper: kmeta.NewDefaultRESTMapperFromScheme(),
cacheMap: make(map[string]Interface),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cacheMap --> gvrToCache

restMapper: kmeta.NewDefaultRESTMapperFromScheme(),
cacheMap: make(map[string]Interface),
cacheConfigMap: make(map[string]*ResourceCacheConfig),
cacheDestroyFuncMap: make(map[string]func()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cacheDestroyFuncMap --> gvrToCacheDestroyFunc

}
}
func (m *multiplexerManager) ResourceCacheConfig(gvr *schema.GroupVersionResource) (*ResourceCacheConfig, error) {
if config, ok := m.cacheConfigMap[gvr.String()]; ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to add a sync lock for cacheConfigMap in order to get rid of race conditioning.

}

func (sp *multiplexerProxy) filterListObject(obj runtime.Object, filter filter.ObjectFilter) (runtime.Object, error) {
if filter == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filter may not be nil because of it is a filter.ObjectFilter. so please use yurtutil.IsNil() func to verify.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants