Skip to content

Commit

Permalink
Emit event logs while awaiting readiness or deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
blampe committed Aug 9, 2024
1 parent 9a5a997 commit f9453ac
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 6 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
Existing readiness logic is unaffected by this setting.
(https://github.com/pulumi/pulumi-kubernetes/issues/2996)

- Pulumi will now emit logs for any Kubernetes "Warning" Events associated with
resources being created, updated or deleted.
(https://github.com/pulumi/pulumi-kubernetes/pull/3135/files)

### Fixed

- Fixed a panic that could occur during deletion. (https://github.com/pulumi/pulumi-kubernetes/issues/3157)
Expand Down
9 changes: 9 additions & 0 deletions provider/pkg/await/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,9 @@ func Creation(c CreateConfig) (*unstructured.Unstructured, error) {

awaiter, err := internal.NewAwaiter(
internal.WithCondition(ready),
internal.WithObservers(
NewEventAggregator(ctx, source, c.DedupLogger, outputs),
),
internal.WithNamespace(outputs.GetNamespace()),
internal.WithLogger(c.DedupLogger),
)
Expand Down Expand Up @@ -456,6 +459,9 @@ func Update(c UpdateConfig) (*unstructured.Unstructured, error) {

awaiter, err := internal.NewAwaiter(
internal.WithCondition(ready),
internal.WithObservers(
NewEventAggregator(ctx, source, c.DedupLogger, currentOutputs),
),
internal.WithNamespace(currentOutputs.GetNamespace()),
internal.WithLogger(c.DedupLogger),
)
Expand Down Expand Up @@ -847,6 +853,9 @@ func Deletion(c DeleteConfig) error {

awaiter, err := internal.NewAwaiter(
internal.WithCondition(deleted),
internal.WithObservers(
NewEventAggregator(ctx, source, c.DedupLogger, c.Outputs),
),
internal.WithNamespace(c.Outputs.GetNamespace()),
internal.WithLogger(c.DedupLogger),
)
Expand Down
99 changes: 93 additions & 6 deletions provider/pkg/await/watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,28 @@
package await

import (
"context"
"fmt"
"strings"
"sync"

"github.com/pulumi/cloud-ready-checks/pkg/checker"
"github.com/pulumi/cloud-ready-checks/pkg/checker/logging"
checkerlog "github.com/pulumi/cloud-ready-checks/pkg/checker/logging"

"github.com/pulumi/cloud-ready-checks/pkg/kubernetes/pod"
"github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition"
"github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/clients"
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
logger "github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
)

var _ condition.Observer = (*Aggregator[*corev1.Event])(nil)

// PodAggregator tracks status for any Pods related to the owner resource, and writes
// warning/error messages to a channel that can be consumed by a resource awaiter.
type PodAggregator struct {
Expand All @@ -46,7 +54,7 @@ type PodAggregator struct {
lister lister

// Messages
messages chan logging.Messages
messages chan checkerlog.Messages
}

// lister lists resources matching a label selector.
Expand All @@ -60,7 +68,7 @@ func NewPodAggregator(owner *unstructured.Unstructured, lister lister) *PodAggre
owner: owner,
lister: lister,
checker: pod.NewPodChecker(),
messages: make(chan logging.Messages),
messages: make(chan checkerlog.Messages),
}
return pa
}
Expand Down Expand Up @@ -103,8 +111,8 @@ func (pa *PodAggregator) run(informChan <-chan watch.Event) {
}

// Read lists existing Pods and returns any related warning/error messages.
func (pa *PodAggregator) Read() logging.Messages {
var messages logging.Messages
func (pa *PodAggregator) Read() checkerlog.Messages {
var messages checkerlog.Messages
checkPod := func(object runtime.Object) {
obj := object.(*unstructured.Unstructured)
pod, err := clients.PodFromUnstructured(obj)
Expand Down Expand Up @@ -149,7 +157,7 @@ func (pa *PodAggregator) stopping() bool {

// ResultChan returns a reference to the message channel used by the PodAggregator to
// communicate warning/error messages to a resource awaiter.
func (pa *PodAggregator) ResultChan() <-chan logging.Messages {
func (pa *PodAggregator) ResultChan() <-chan checkerlog.Messages {
return pa.messages
}

Expand All @@ -167,3 +175,82 @@ func (s *staticLister) List(_ labels.Selector) (ret []runtime.Object, err error)
}
return objects, nil
}

// Aggregator is a generic, stateless condition.Observer intended for reporting
// informational messages about related resources during an Await.
type Aggregator[T runtime.Object] struct {
observer condition.Observer
callback func(logMessager, T) error
logger logMessager
}

// NewAggregator creates a new Aggregator for the given runtime type. The
// provided condition.Observer must be configured for the corresponding GVK.
func NewAggregator[T runtime.Object](
observer condition.Observer,
logger logMessager,
callback func(logMessager, T) error,
) *Aggregator[T] {
return &Aggregator[T]{
observer: observer,
callback: callback,
logger: logger,
}
}

func (i *Aggregator[T]) Observe(e watch.Event) error {
obj, ok := e.Object.(*unstructured.Unstructured)
if !ok {
return nil
}
var t T
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, &t)
if err != nil {
return err
}
return i.callback(i.logger, t)
}

func (i *Aggregator[T]) Range(yield func(watch.Event) bool) {
i.observer.Range(yield)
}

// NewEventAggregator creates a new condition.Observer subscribed to Kubernetes
// events related to the owner object. Event messages are logged at WARN
// severity if the event has type Warning; Normal events are discarded to
// reduce noise.
func NewEventAggregator(
ctx context.Context,
source condition.Source,
logger logMessager,
owner *unstructured.Unstructured,
) condition.Observer {
observer := condition.NewObserver(ctx,
source,
corev1.SchemeGroupVersion.WithKind("events"),
func(obj *unstructured.Unstructured) bool {
uid, _, _ := unstructured.NestedString(obj.Object, "involvedObject", "uid")
return uid == string(owner.GetUID())
},
)
return NewAggregator(observer, logger,
func(l logMessager, e *corev1.Event) error {
if e == nil || e.Type != corev1.EventTypeWarning {
return nil
}
msg := fmt.Sprintf(
"[%s/%s] %s: %s",
strings.ToLower(e.InvolvedObject.Kind),
e.InvolvedObject.Name,
e.Reason,
e.Message,
)
logger.LogMessage(checkerlog.WarningMessage(msg))
return nil
},
)
}

type logMessager interface {
LogMessage(checkerlog.Message)
}
133 changes: 133 additions & 0 deletions provider/pkg/await/watchers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright 2024, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package await

import (
"context"
"fmt"
"io"
"strings"
"testing"
"time"

checkerlog "github.com/pulumi/cloud-ready-checks/pkg/checker/logging"
"github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
)

func TestEventAggregator(t *testing.T) {
owner := &unstructured.Unstructured{Object: map[string]any{
"apiVersion": "pulumi.com/v1",
"kind": "Stack",
"metadata": map[string]any{
"name": "my-stack-28073241",
"namespace": "operator",
"uid": "9bd08f1a-fa5b-40a5-ba41-bf69899a4416",
},
}}

tests := []struct {
name string
event watch.Event
want string
}{
{
name: "related warning",
event: watch.Event{Type: watch.Added, Object: &unstructured.Unstructured{Object: map[string]any{
"apiVersion": "v1",
"kind": "Event",
"reason": "StackUpdateFailure",
"type": "Warning",
"message": "Failed to update Stack",
"involvedObject": map[string]any{
"kind": "Stack",
"name": "my-stack-28073241",
"uid": "9bd08f1a-fa5b-40a5-ba41-bf69899a4416",
},
}}},
want: "warning [stack/my-stack-28073241] StackUpdateFailure: Failed to update Stack",
},
{
name: "related info",
event: watch.Event{Type: watch.Added, Object: &unstructured.Unstructured{Object: map[string]any{
"apiVersion": "v1",
"kind": "Event",
"reason": "Killing",
"type": "Normal",
"message": "frog blast the vent core",
"involvedObject": map[string]any{
"kind": "Pod",
"name": "mypod-7854ff8877-p9ksk",
"uid": "9bd08f1a-fa5b-40a5-ba41-bf69899a4416",
},
}}},
want: "",
},
{
name: "unrelated warning",
event: watch.Event{Type: watch.Added, Object: &unstructured.Unstructured{Object: map[string]any{
"apiVersion": "v1",
"kind": "Event",
"reason": "Killing",
"type": "Warning",
"message": "Stopping container nginx",
"involvedObject": map[string]any{
"kind": "Pod",
"name": "some-other-name",
"uid": "some-other-uid",
},
}}},
want: "",
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

source := condition.Static(make(chan watch.Event, 1))

buf := &strings.Builder{}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

obs := NewEventAggregator(context.Background(), source, log{buf}, owner)

seen := make(chan struct{})
go obs.Range(func(e watch.Event) bool {
_ = obs.Observe(e)
seen <- struct{}{}
return true
})

source <- tt.event
select {
case <-seen:
case <-ctx.Done():
}
assert.Equal(t, buf.String(), tt.want)
})
}
}

type log struct{ io.Writer }

func (l log) LogMessage(m checkerlog.Message) {
fmt.Fprintf(l, "%s %s", m.Severity, m.String())
}

0 comments on commit f9453ac

Please sign in to comment.