Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated backport of #983: Extend CreateOrUpdate to allow mutating before create #1001: Add IdentfyingLabels to CreateOrUpdateWithOptions #1004

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 82 additions & 29 deletions pkg/util/create_or_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,36 +65,65 @@ var backOff wait.Backoff = wait.Backoff{

var logger = log.Logger{Logger: logf.Log}

type CreateOrUpdateOptions[T runtime.Object] struct {
Client resource.Interface[T]
Obj T
MutateOnUpdate MutateFn[T]
MutateOnCreate MutateFn[T]
// IdentifyingLabels is used to find an existing resource if GenerateName is set in the target resource.
IdentifyingLabels map[string]string
}

func CreateOrUpdateWithOptions[T runtime.Object](ctx context.Context, options CreateOrUpdateOptions[T]) (OperationResult, T, error) {
return maybeCreateOrUpdate(ctx, options, opCreate)
}

// CreateOrUpdate tries to obtain an existing resource and, if not found, creates 'obj' otherwise updates it. The existing resource
// is normally retrieved via 'obj's Name field but if it's empty and the GenerateName field is non-empty, it will try to retrieve it
// via the List method using 'obj's Labels. This assumes that the labels uniquely identify the resource. If more than one resource is
// found, an error is returned.
func CreateOrUpdate[T runtime.Object](ctx context.Context, client resource.Interface[T], obj T, mutate MutateFn[T],
) (OperationResult, error) {
return maybeCreateOrUpdate(ctx, client, obj, mutate, opCreate)
r, _, err := CreateOrUpdateWithOptions(ctx, CreateOrUpdateOptions[T]{
Client: client,
Obj: obj,
MutateOnUpdate: mutate,
})

return r, err
}

// Update tries to obtain an existing resource and, if found, updates it. If not found, no error is returned.
func Update[T runtime.Object](ctx context.Context, client resource.Interface[T], obj T, mutate MutateFn[T]) error {
_, err := maybeCreateOrUpdate(ctx, client, obj, mutate, opUpdate)
_, _, err := maybeCreateOrUpdate(ctx, CreateOrUpdateOptions[T]{
Client: client,
Obj: obj,
MutateOnUpdate: mutate,
}, opUpdate)

return err
}

// Update tries to obtain an existing resource and, if found, updates it. If not found, a NotFound error is returned.
func MustUpdate[T runtime.Object](ctx context.Context, client resource.Interface[T], obj T, mutate MutateFn[T]) error {
_, err := maybeCreateOrUpdate(ctx, client, obj, mutate, opMustUpdate)
_, _, err := maybeCreateOrUpdate(ctx, CreateOrUpdateOptions[T]{
Client: client,
Obj: obj,
MutateOnUpdate: mutate,
}, opMustUpdate)

return err
}

func maybeCreateOrUpdate[T runtime.Object](ctx context.Context, client resource.Interface[T], obj T, mutate MutateFn[T],
op opType,
) (OperationResult, error) {
func maybeCreateOrUpdate[T runtime.Object](ctx context.Context, options CreateOrUpdateOptions[T], op opType) (OperationResult, T, error) {
var returnObj T

result := OperationResultNone

objMeta := resource.MustToMeta(obj)
objMeta := resource.MustToMeta(options.Obj)

err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
existing, err := getResource(ctx, client, obj)
existing, err := getResource(ctx, &options)
if apierrors.IsNotFound(err) {
if op != opCreate {
logger.V(log.LIBTRACE).Infof("Resource %q does not exist - not updating", objMeta.GetName())
Expand All @@ -106,19 +135,24 @@ func maybeCreateOrUpdate[T runtime.Object](ctx context.Context, client resource.
return nil
}

logger.V(log.LIBTRACE).Infof("Creating resource: %#v", obj)

result = OperationResultCreated
return createResource(ctx, client, obj)

returnObj, err = createResource(ctx, options.Client, options.Obj, options.MutateOnCreate)

return err
}

if err != nil {
return errors.Wrapf(err, "error retrieving %q", objMeta.GetName())
}

if options.MutateOnUpdate == nil {
return nil
}

origObj := resource.MustToUnstructuredUsingDefaultConverter(existing)

toUpdate, err := mutate(existing)
toUpdate, err := options.MutateOnUpdate(existing)
if err != nil {
return err
}
Expand All @@ -143,7 +177,7 @@ func maybeCreateOrUpdate[T runtime.Object](ctx context.Context, client resource.

// UpdateStatus for generic clients (eg dynamic client) will return NotFound error if the resource CRD
// doesn't have the status subresource so we'll ignore it.
updated, err := client.UpdateStatus(ctx, toUpdate, metav1.UpdateOptions{})
updated, err := options.Client.UpdateStatus(ctx, toUpdate, metav1.UpdateOptions{})
if err == nil {
unstructured.RemoveNestedField(origObj.Object, StatusField)
unstructured.RemoveNestedField(newObj.Object, StatusField)
Expand All @@ -157,31 +191,38 @@ func maybeCreateOrUpdate[T runtime.Object](ctx context.Context, client resource.
return nil
}

logger.V(log.LIBTRACE).Infof("Updating resource: %s", resource.ToJSON(obj))
logger.V(log.LIBTRACE).Infof("Updating resource: %s", resource.ToJSON(options.Obj))

result = OperationResultUpdated
_, err = client.Update(ctx, toUpdate, metav1.UpdateOptions{})
returnObj, err = options.Client.Update(ctx, toUpdate, metav1.UpdateOptions{})

return errors.Wrapf(err, "error updating %s", resource.ToJSON(toUpdate))
})
if err != nil {
return OperationResultNone, errors.Wrap(err, "error creating or updating resource")
return OperationResultNone, *new(T), errors.Wrap(err, "error creating or updating resource")
}

return result, nil
return result, returnObj, nil
}

//nolint:wrapcheck // No need to wrap errors
func getResource[T runtime.Object](ctx context.Context, client resource.Interface[T], obj T) (T, error) {
objMeta := resource.MustToMeta(obj)
func getResource[T runtime.Object](ctx context.Context, options *CreateOrUpdateOptions[T]) (T, error) {
objMeta := resource.MustToMeta(options.Obj)

if objMeta.GetName() != "" || objMeta.GetGenerateName() == "" {
obj, err := client.Get(ctx, objMeta.GetName(), metav1.GetOptions{})
obj, err := options.Client.Get(ctx, objMeta.GetName(), metav1.GetOptions{})
return obj, err
}

list, err := client.List(ctx, metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(objMeta.GetLabels()).String(),
var selectorLabels map[string]string
if len(options.IdentifyingLabels) > 0 {
selectorLabels = options.IdentifyingLabels
} else {
selectorLabels = objMeta.GetLabels()
}

list, err := options.Client.List(ctx, metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(selectorLabels).String(),
})
if err != nil {
return *new(T), err
Expand All @@ -199,17 +240,28 @@ func getResource[T runtime.Object](ctx context.Context, client resource.Interfac
return list[0], nil
}

func createResource[T runtime.Object](ctx context.Context, client resource.Interface[T], obj T) error {
func createResource[T runtime.Object](ctx context.Context, client resource.Interface[T], obj T, mutate MutateFn[T]) (T, error) {
if mutate != nil {
mutated, err := mutate(obj)
if err != nil {
return *new(T), err
}

obj = mutated
}

logger.V(log.LIBTRACE).Infof("Creating resource: %#v", obj)

objMeta := resource.MustToMeta(obj)

created, err := client.Create(ctx, obj, metav1.CreateOptions{})
if apierrors.IsAlreadyExists(err) {
logger.V(log.LIBDEBUG).Infof("Resource %q already exists - retrying", objMeta.GetName())
return apierrors.NewConflict(schema.GroupResource{}, objMeta.GetName(), err)
return *new(T), apierrors.NewConflict(schema.GroupResource{}, objMeta.GetName(), err)
}

if err != nil {
return errors.Wrapf(err, "error creating %#v", obj)
return *new(T), errors.Wrapf(err, "error creating %#v", obj)
}

status, ok := GetNestedField(resource.MustToUnstructuredUsingDefaultConverter(obj), StatusField).(map[string]interface{})
Expand All @@ -218,14 +270,15 @@ func createResource[T runtime.Object](ctx context.Context, client resource.Inter
// do a separate UpdateStatus call.
objMeta.SetResourceVersion(resource.MustToMeta(created).GetResourceVersion())
objMeta.SetUID(resource.MustToMeta(created).GetUID())
objMeta.SetCreationTimestamp(resource.MustToMeta(created).GetCreationTimestamp())

_, err := client.UpdateStatus(ctx, obj, metav1.UpdateOptions{})
created, err = client.UpdateStatus(ctx, obj, metav1.UpdateOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error updating status for %#v", obj)
return *new(T), errors.Wrapf(err, "error updating status for %#v", obj)
}
}

return nil
return created, nil
}

// CreateAnew creates a resource, first deleting an existing instance if one exists.
Expand Down Expand Up @@ -292,7 +345,7 @@ func SetBackoff(b wait.Backoff) wait.Backoff {
}

func Replace[T runtime.Object](with T) MutateFn[T] {
return func(existing T) (T, error) {
return func(_ T) (T, error) {
return with, nil
}
}
54 changes: 52 additions & 2 deletions pkg/util/create_or_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,23 @@ var _ = Describe("CreateOrUpdate function", func() {
t := newCreateOrUpdateTestDiver()

createOrUpdate := func(expResult util.OperationResult) error {
result, err := util.CreateOrUpdate[*unstructured.Unstructured](context.TODO(), resource.ForDynamic(t.client),
resource.MustToUnstructured(t.pod), t.mutateFn)
options := util.CreateOrUpdateOptions[*unstructured.Unstructured]{
Client: resource.ForDynamic(t.client),
MutateOnUpdate: t.mutateFn,
}

if t.pod.GenerateName != "" {
options.IdentifyingLabels = map[string]string{}
for k, v := range t.pod.Labels {
options.IdentifyingLabels[k] = v
}

t.pod.Labels["new-label"] = "new-value"
}

options.Obj = resource.MustToUnstructured(t.pod)

result, _, err := util.CreateOrUpdateWithOptions[*unstructured.Unstructured](context.TODO(), options)
if err != nil && expResult != util.OperationResultNone {
return err
}
Expand All @@ -166,6 +181,41 @@ var _ = Describe("CreateOrUpdate function", func() {
tests.EnsureNoActionsForResource(t.testingFake, "pods/status", "update")
})

Context("and a mutation function specified", func() {
It("should invoke the function on create", func() {
result, created, err := util.CreateOrUpdateWithOptions[*unstructured.Unstructured](context.TODO(),
util.CreateOrUpdateOptions[*unstructured.Unstructured]{
Client: resource.ForDynamic(t.client),
Obj: resource.MustToUnstructured(t.pod),
MutateOnCreate: func(existing *unstructured.Unstructured) (*unstructured.Unstructured, error) {
existing.SetAnnotations(map[string]string{"on-create-invoked": "true"})
return existing, nil
},
})
Expect(err).To(Succeed())
Expect(result).To(Equal(util.OperationResultCreated))

actual := t.verifyPod()
Expect(actual.Annotations).To(HaveKeyWithValue("on-create-invoked", "true"))

Expect(resource.MustFromUnstructured(created, &corev1.Pod{})).To(Equal(actual))
})

Context("which returns an error", func() {
It("should return an error", func() {
_, _, err := util.CreateOrUpdateWithOptions[*unstructured.Unstructured](context.TODO(),
util.CreateOrUpdateOptions[*unstructured.Unstructured]{
Client: resource.ForDynamic(t.client),
Obj: resource.MustToUnstructured(t.pod),
MutateOnCreate: func(_ *unstructured.Unstructured) (*unstructured.Unstructured, error) {
return nil, errors.New("mutate failure")
},
})
Expect(err).To(HaveOccurred())
})
})
})

Context("and GenerateName is set", func() {
BeforeEach(func() {
t.pod.Name = ""
Expand Down
Loading