-
Notifications
You must be signed in to change notification settings - Fork 53
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add catalogsource entitysource (#129)
* add catalogsource entitysource Signed-off-by: Ankita Thomas <[email protected]> * move catalogsource querier out to new controller Signed-off-by: Ankita Thomas <[email protected]> * Hook up reconciler to main Signed-off-by: perdasilva <[email protected]> * fix unit tests and manifests Signed-off-by: Ankita Thomas <[email protected]> * Switch from hardcoded data source to catalog source entity source Signed-off-by: perdasilva <[email protected]> * Move entity_source pkg outside of variable_source pkg Signed-off-by: perdasilva <[email protected]> * Add vendor to .gitignore Signed-off-by: perdasilva <[email protected]> * Remove registry cache querier Signed-off-by: perdasilva <[email protected]> * Add initial catalog source controller unit tests Signed-off-by: perdasilva <[email protected]> * fixing unit tests, removing unused files Signed-off-by: Ankita Thomas <[email protected]> * Refactor catsrc controller unit tests Signed-off-by: perdasilva <[email protected]> * update go.mod Signed-off-by: perdasilva <[email protected]> * fix format, imports, and lint Signed-off-by: perdasilva <[email protected]> * add rbac for issuing events Signed-off-by: perdasilva <[email protected]> * add catsrc controller e2e tests Signed-off-by: Varsha Prasad Narsing <[email protected]> * add e2e tests Signed-off-by: Varsha Prasad Narsing <[email protected]> * Fix e2e catalogsource image Signed-off-by: Ankita Thomas <[email protected]> --------- Signed-off-by: Ankita Thomas <[email protected]> Signed-off-by: perdasilva <[email protected]> Signed-off-by: Varsha Prasad Narsing <[email protected]> Co-authored-by: perdasilva <[email protected]> Co-authored-by: Varsha Prasad Narsing <[email protected]>
- Loading branch information
1 parent
f4b4bba
commit 2973a14
Showing
23 changed files
with
1,851 additions
and
38 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
vendor | ||
|
||
# Binaries for programs and plugins | ||
*.exe | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,203 @@ | ||
package controllers | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"reflect" | ||
"sync" | ||
"time" | ||
|
||
"github.com/operator-framework/api/pkg/operators/v1alpha1" | ||
"github.com/operator-framework/deppy/pkg/deppy" | ||
"github.com/operator-framework/deppy/pkg/deppy/input" | ||
"k8s.io/apimachinery/pkg/api/errors" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/client-go/tools/record" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
"sigs.k8s.io/controller-runtime/pkg/log" | ||
|
||
"github.com/operator-framework/operator-controller/internal/resolution/entity_sources/catalogsource" | ||
) | ||
|
||
const ( | ||
defaultCatalogSourceSyncInterval = 5 * time.Minute | ||
defaultRegistryGRPCConnectionTimeout = 10 * time.Second | ||
|
||
eventTypeNormal = "Normal" | ||
eventTypeWarning = "Warning" | ||
|
||
eventReasonCacheUpdated = "BundleCacheUpdated" | ||
eventReasonCacheUpdateFailed = "BundleCacheUpdateFailed" | ||
) | ||
|
||
type CatalogSourceReconcilerOption func(reconciler *CatalogSourceReconciler) | ||
|
||
func WithRegistryClient(registry catalogsource.RegistryClient) CatalogSourceReconcilerOption { | ||
return func(reconciler *CatalogSourceReconciler) { | ||
reconciler.registry = registry | ||
} | ||
} | ||
|
||
func WithUnmanagedCatalogSourceSyncInterval(interval time.Duration) CatalogSourceReconcilerOption { | ||
return func(reconciler *CatalogSourceReconciler) { | ||
reconciler.unmanagedCatalogSourceSyncInterval = interval | ||
} | ||
} | ||
|
||
// applyDefaults applies default values to empty CatalogSourceReconciler fields _after_ options have been applied | ||
func applyDefaults() CatalogSourceReconcilerOption { | ||
return func(reconciler *CatalogSourceReconciler) { | ||
if reconciler.registry == nil { | ||
reconciler.registry = catalogsource.NewRegistryGRPCClient(defaultRegistryGRPCConnectionTimeout) | ||
} | ||
if reconciler.unmanagedCatalogSourceSyncInterval == 0 { | ||
reconciler.unmanagedCatalogSourceSyncInterval = defaultCatalogSourceSyncInterval | ||
} | ||
} | ||
} | ||
|
||
type CatalogSourceReconciler struct { | ||
sync.RWMutex | ||
client.Client | ||
scheme *runtime.Scheme | ||
registry catalogsource.RegistryClient | ||
recorder record.EventRecorder | ||
unmanagedCatalogSourceSyncInterval time.Duration | ||
cache map[string]map[deppy.Identifier]*input.Entity | ||
} | ||
|
||
func NewCatalogSourceReconciler(client client.Client, scheme *runtime.Scheme, recorder record.EventRecorder, options ...CatalogSourceReconcilerOption) *CatalogSourceReconciler { | ||
reconciler := &CatalogSourceReconciler{ | ||
RWMutex: sync.RWMutex{}, | ||
Client: client, | ||
scheme: scheme, | ||
recorder: recorder, | ||
unmanagedCatalogSourceSyncInterval: 0, | ||
cache: map[string]map[deppy.Identifier]*input.Entity{}, | ||
} | ||
// apply options | ||
options = append(options, applyDefaults()) | ||
for _, option := range options { | ||
option(reconciler) | ||
} | ||
|
||
return reconciler | ||
} | ||
|
||
// +kubebuilder:rbac:groups=operators.coreos.com,resources=catalogsources,verbs=get;list;watch | ||
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch | ||
|
||
func (r *CatalogSourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { | ||
l := log.FromContext(ctx).WithName("catalogsource-controller") | ||
l.V(1).Info("starting") | ||
defer l.V(1).Info("ending") | ||
|
||
var catalogSource = &v1alpha1.CatalogSource{} | ||
if err := r.Client.Get(ctx, req.NamespacedName, catalogSource); err != nil { | ||
if errors.IsNotFound(err) { | ||
r.dropSource(req.String()) | ||
} | ||
return ctrl.Result{}, client.IgnoreNotFound(err) | ||
} | ||
|
||
entities, err := r.registry.ListEntities(ctx, catalogSource) | ||
// TODO: invalidate stale cache for failed updates | ||
if err != nil { | ||
r.recorder.Event(catalogSource, eventTypeWarning, eventReasonCacheUpdateFailed, fmt.Sprintf("Failed to update bundle cache from %s/%s: %v", catalogSource.GetNamespace(), catalogSource.GetName(), err)) | ||
return ctrl.Result{Requeue: !isManagedCatalogSource(*catalogSource)}, err | ||
} | ||
if updated := r.updateCache(req.String(), entities); updated { | ||
r.recorder.Event(catalogSource, eventTypeNormal, eventReasonCacheUpdated, fmt.Sprintf("Successfully updated bundle cache from %s/%s", catalogSource.GetNamespace(), catalogSource.GetName())) | ||
} | ||
|
||
if isManagedCatalogSource(*catalogSource) { | ||
return ctrl.Result{}, nil | ||
} | ||
return ctrl.Result{RequeueAfter: r.unmanagedCatalogSourceSyncInterval}, nil | ||
} | ||
|
||
// SetupWithManager sets up the controller with the Manager. | ||
func (r *CatalogSourceReconciler) SetupWithManager(mgr ctrl.Manager) error { | ||
return ctrl.NewControllerManagedBy(mgr). | ||
For(&v1alpha1.CatalogSource{}). | ||
Complete(r) | ||
} | ||
|
||
// TODO: find better way to identify catalogSources unmanaged by olm | ||
func isManagedCatalogSource(catalogSource v1alpha1.CatalogSource) bool { | ||
return len(catalogSource.Spec.Address) == 0 | ||
} | ||
|
||
func (r *CatalogSourceReconciler) updateCache(sourceID string, entities []*input.Entity) bool { | ||
newSourceCache := make(map[deppy.Identifier]*input.Entity) | ||
for _, entity := range entities { | ||
newSourceCache[entity.Identifier()] = entity | ||
} | ||
if _, ok := r.cache[sourceID]; ok && reflect.DeepEqual(r.cache[sourceID], newSourceCache) { | ||
return false | ||
} | ||
r.RWMutex.Lock() | ||
defer r.RWMutex.Unlock() | ||
r.cache[sourceID] = newSourceCache | ||
// return whether cache had updates | ||
return true | ||
} | ||
|
||
func (r *CatalogSourceReconciler) dropSource(sourceID string) { | ||
r.RWMutex.Lock() | ||
defer r.RWMutex.Unlock() | ||
delete(r.cache, sourceID) | ||
} | ||
|
||
func (r *CatalogSourceReconciler) Get(ctx context.Context, id deppy.Identifier) *input.Entity { | ||
r.RWMutex.RLock() | ||
defer r.RWMutex.RUnlock() | ||
// don't count on deppy ID to reflect its catalogsource | ||
for _, source := range r.cache { | ||
if entity, ok := source[id]; ok { | ||
return entity | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (r *CatalogSourceReconciler) Filter(ctx context.Context, filter input.Predicate) (input.EntityList, error) { | ||
resultSet := input.EntityList{} | ||
if err := r.Iterate(ctx, func(entity *input.Entity) error { | ||
if filter(entity) { | ||
resultSet = append(resultSet, *entity) | ||
} | ||
return nil | ||
}); err != nil { | ||
return nil, err | ||
} | ||
return resultSet, nil | ||
} | ||
|
||
func (r *CatalogSourceReconciler) GroupBy(ctx context.Context, fn input.GroupByFunction) (input.EntityListMap, error) { | ||
resultSet := input.EntityListMap{} | ||
if err := r.Iterate(ctx, func(entity *input.Entity) error { | ||
keys := fn(entity) | ||
for _, key := range keys { | ||
resultSet[key] = append(resultSet[key], *entity) | ||
} | ||
return nil | ||
}); err != nil { | ||
return nil, err | ||
} | ||
return resultSet, nil | ||
} | ||
|
||
func (r *CatalogSourceReconciler) Iterate(ctx context.Context, fn input.IteratorFunction) error { | ||
r.RWMutex.RLock() | ||
defer r.RWMutex.RUnlock() | ||
for _, source := range r.cache { | ||
for _, entity := range source { | ||
if err := fn(entity); err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
return nil | ||
} |
Oops, something went wrong.