Skip to content

Commit

Permalink
Updating node and pod metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
mattmattox committed Jul 31, 2023
1 parent d0daa59 commit 2a58e9d
Show file tree
Hide file tree
Showing 9 changed files with 366 additions and 55 deletions.
83 changes: 46 additions & 37 deletions controllers/node_monitor_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func NewNodeMonitorController(clientset KubernetesClientset, logger *logrus.Logg
}

func (n *NodeMonitorController) Start() error {
monitoring.ControllerStatus("nodeMonitorController", true)
monitoring.ControllerStatus("NodeMonitorController", true)
// Create a shared informer factory
factory := informers.NewSharedInformerFactory(n.Clientset, time.Minute)
logrus.Debug("Created shared informer factory")
Expand Down Expand Up @@ -56,34 +56,31 @@ func (n *NodeMonitorController) Stop() error {
return nil
}

func (n *NodeMonitorController) logNodeDetails(event string, node *v1.Node, status v1.ConditionStatus) {
n.Logger.Infof("Node %s: %s/%s", event, node.Name, status)
n.Logger.Debug("Node Details: ", node.Status, node.Status.Conditions, node.Annotations, node.Labels, node.Finalizers, node.Spec.Taints, node.Status.Capacity, node.Status.Allocatable, node.Status.Addresses)
}

func (n *NodeMonitorController) onAdd(obj interface{}) {
node := obj.(*v1.Node)
_, msg := getNodeReadyStatus(node)
n.Logger.Info("Node added:", node.Name, msg)
n.Logger.Debug("Node Status: ", node.Status)
n.Logger.Debug("Node Conditions: ", node.Status.Conditions)
n.Logger.Debug("Node Annotations: ", node.Annotations)
n.Logger.Debug("Node Labels: ", node.Labels)
n.Logger.Debug("Node Finalizers: ", node.Finalizers)
n.Logger.Debug("Node Taints: ", node.Spec.Taints)
n.Logger.Debug("Node Capacity: ", node.Status.Capacity)
n.Logger.Debug("Node Allocatable: ", node.Status.Allocatable)
n.Logger.Debug("Node Addresses: ", node.Status.Addresses)
status, _ := getNodeReadyStatus(node)
n.logNodeDetails("added", node, status)
summary := NewNodeSummary(node)
monitoring.UpdateNodeMetrics(nil, &summary)
}

func (n *NodeMonitorController) onUpdate(oldObj, newObj interface{}) {
node := newObj.(*v1.Node)
_, msg := getNodeReadyStatus(node)
n.Logger.Info("Node updated:", node.Name, msg)
n.Logger.Debug("Node Status: ", node.Status)
n.Logger.Debug("Node Conditions: ", node.Status.Conditions)
n.Logger.Debug("Node Annotations: ", node.Annotations)
n.Logger.Debug("Node Labels: ", node.Labels)
n.Logger.Debug("Node Finalizers: ", node.Finalizers)
n.Logger.Debug("Node Taints: ", node.Spec.Taints)
n.Logger.Debug("Node Capacity: ", node.Status.Capacity)
n.Logger.Debug("Node Allocatable: ", node.Status.Allocatable)
n.Logger.Debug("Node Addresses: ", node.Status.Addresses)
oldNode, ok1 := oldObj.(*v1.Node)
newNode, ok2 := newObj.(*v1.Node)
if !ok1 || !ok2 {
n.Logger.Error("Failed to convert obj to Node")
return
}
status, _ := getNodeReadyStatus(newNode)
n.logNodeDetails("updated", newNode, status)
oldSummary := NewNodeSummary(oldNode)
newSummary := NewNodeSummary(newNode)
monitoring.UpdateNodeMetrics(&oldSummary, &newSummary)
}

func (n *NodeMonitorController) onDelete(obj interface{}) {
Expand All @@ -92,24 +89,36 @@ func (n *NodeMonitorController) onDelete(obj interface{}) {
fmt.Println("Delete event with incorrect type:", obj)
return
}
status, msg := getNodeReadyStatus(node)
n.Logger.Info("Node deleted:", node.Name, msg)
n.Logger.Debug("Node Status: ", status)
n.Logger.Debug("Node Conditions: ", node.Status.Conditions)
n.Logger.Debug("Node Annotations: ", node.Annotations)
n.Logger.Debug("Node Labels: ", node.Labels)
n.Logger.Debug("Node Finalizers: ", node.Finalizers)
n.Logger.Debug("Node Taints: ", node.Spec.Taints)
n.Logger.Debug("Node Addresses: ", node.Status.Addresses)
n.Logger.Debug("Node Capacity: ", node.Status.Capacity)
n.Logger.Debug("Node Allocatable: ", node.Status.Allocatable)
n.Logger.Debug("Node Conditions: ", node.Status.Conditions)
status, _ := getNodeReadyStatus(node)
n.logNodeDetails("deleted", node, status)
summary := NewNodeSummary(node)
monitoring.UpdateNodeMetrics(&summary, nil)
}

// NewNodeSummary creates a NodeSummary from a given node
func NewNodeSummary(node *v1.Node) monitoring.NodeSummary {
readyStatus, _ := getNodeReadyStatus(node)
return monitoring.NodeSummary{
Name: node.Name,
ReadyStatus: readyStatus,
Conditions: node.Status.Conditions,
Annotations: node.Annotations,
Labels: node.Labels,
Taints: node.Spec.Taints,
Capacity: node.Status.Capacity,
Allocatable: node.Status.Allocatable,
}
}

// getNodeReadyStatus is a helper function to extract the ready status from a node
func getNodeReadyStatus(node *v1.Node) (v1.ConditionStatus, string) {
for _, condition := range node.Status.Conditions {
if condition.Type == v1.NodeReady {
return condition.Status, "Ready"
if condition.Status == v1.ConditionTrue {
return condition.Status, "Ready"
} else {
return condition.Status, "NotReady"
}
}
}
return v1.ConditionUnknown, "Unknown"
Expand Down
9 changes: 0 additions & 9 deletions controllers/node_monitor_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,6 @@ import (
"k8s.io/client-go/rest"
)

type MockClusterConnectionFactory struct {
mock.Mock
}

func (m *MockClusterConnectionFactory) NewForConfig(config *rest.Config) (*fake.Clientset, error) {
args := m.Called(config)
return args.Get(0).(*fake.Clientset), args.Error(1)
}

func TestNodeMonitorControllerStart(t *testing.T) {
// Create a fake clientset
fakeClientset := fake.NewSimpleClientset()
Expand Down
106 changes: 106 additions & 0 deletions controllers/pod_monitor_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package controllers

import (
"fmt"
"time"

"github.com/sirupsen/logrus"
"github.com/supporttools/KubeWatchman/monitoring"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)

func NewPodMonitorController(clientset KubernetesClientset, logger *logrus.Logger) *PodMonitorController {
return &PodMonitorController{
Clientset: clientset,
Logger: logger.WithField("controller", "PodMonitorController"),
podStatuses: make(map[string]int),
}
}

func (n *PodMonitorController) Start() error {
monitoring.ControllerStatus("PodMonitorController", true)
factory := informers.NewSharedInformerFactory(n.Clientset, time.Minute)
PodInformer := factory.Core().V1().Pods().Informer()

PodInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: n.onAdd,
UpdateFunc: n.onUpdate,
DeleteFunc: n.onDelete,
})

n.stopCh = make(chan struct{})
go PodInformer.Run(n.stopCh)

if !cache.WaitForCacheSync(n.stopCh, PodInformer.HasSynced) {
logrus.Error("Timed out waiting for caches to sync")
return fmt.Errorf("timed out waiting for caches to sync")
}

logrus.Debug("Pod informer synced successfully")
return nil
}

func (n *PodMonitorController) Stop() error {
monitoring.ControllerStatus("PodMonitorController", false)
close(n.stopCh)
return nil
}

func (n *PodMonitorController) logPodDetails(Pod *v1.Pod, event string) {
n.Logger.Infof("Pod %s: %s (status: %s)", event, Pod.Name, Pod.Status.Phase)
n.Logger.Debug("Pod Details: ", Pod.Status, Pod.Status.Conditions, Pod.Annotations, Pod.Labels, Pod.Finalizers, Pod.Status.PodIP)
}

func (n *PodMonitorController) onAdd(obj interface{}) {
Pod := obj.(*v1.Pod)
n.logPodDetails(Pod, "added")
monitoring.PodCountChange("inc")
status := string(Pod.Status.Phase)
n.podStatuses[status]++
monitoring.UpdatePodStatus(status, float64(n.podStatuses[status]))
}

func (n *PodMonitorController) onUpdate(oldObj, newObj interface{}) {
oldPod := oldObj.(*v1.Pod)
newPod := newObj.(*v1.Pod)
n.logPodDetails(newPod, "updated")

oldStatus := string(oldPod.Status.Phase)
newStatus := string(newPod.Status.Phase)
n.podStatuses[oldStatus]--
n.podStatuses[newStatus]++
monitoring.UpdatePodStatus(oldStatus, float64(n.podStatuses[oldStatus]))
monitoring.UpdatePodStatus(newStatus, float64(n.podStatuses[newStatus]))

oldSummary := monitoring.CreatePodSummary(oldPod)
newSummary := monitoring.CreatePodSummary(newPod)

for _, container := range oldSummary.Containers {
if container.IsCrashLooping {
n.CrashLoopingCount--
}
}

for _, container := range newSummary.Containers {
if container.IsCrashLooping {
n.CrashLoopingCount++
}
}

monitoring.UpdatePodStatus("CrashLoopBackOff", float64(n.CrashLoopingCount))
}

func (n *PodMonitorController) onDelete(obj interface{}) {
Pod, ok := obj.(*v1.Pod)
if !ok {
fmt.Println("Delete event with incorrect type:", obj)
return
}
n.logPodDetails(Pod, "deleted")
monitoring.PodCountChange("dec")
status := string(Pod.Status.Phase)
n.podStatuses[status]--
monitoring.UpdatePodStatus(status, float64(n.podStatuses[status]))
}
39 changes: 39 additions & 0 deletions controllers/pod_monitor_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package controllers_test

import (
"testing"

"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/mock"
"github.com/supporttools/KubeWatchman/controllers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
)

func TestPodMonitorControllerStart(t *testing.T) {
// Create a fake clientset
fakeClientset := fake.NewSimpleClientset()

// Create a mock factory
mockFactory := new(MockClusterConnectionFactory)
mockFactory.On("NewForConfig", mock.Anything).Return(fakeClientset, nil)

// Use the mock factory to create a Clientset
clientset, err := mockFactory.NewForConfig(&rest.Config{})
if err != nil {
t.Errorf("Failed to create clientset: %v", err)
}

// Create a logger and hook for testing
logger, _ := test.NewNullLogger()

// Create the PodMonitorController using the clientset
controller := controllers.NewPodMonitorController(clientset, logger) // Use clientset here directly

// Start the controller
if err := controller.Start(); err != nil {
t.Errorf("Failed to start controller: %v", err)
}

// Additional testing logic needed here
}
17 changes: 14 additions & 3 deletions controllers/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"github.com/sirupsen/logrus"
"github.com/supporttools/KubeWatchman/monitoring"
"k8s.io/client-go/kubernetes"
)

Expand All @@ -10,7 +11,17 @@ type KubernetesClientset interface {
}

type NodeMonitorController struct {
Clientset KubernetesClientset
Logger *logrus.Entry
stopCh chan struct{}
Clientset KubernetesClientset
Logger *logrus.Entry
stopCh chan struct{}
NodeSummary map[string]*monitoring.NodeSummary
}

type PodMonitorController struct {
Clientset KubernetesClientset
Logger *logrus.Entry
stopCh chan struct{}
PodSummary map[string]*monitoring.PodSummary
podStatuses map[string]int
CrashLoopingCount int
}
16 changes: 16 additions & 0 deletions controllers/types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package controllers_test

import (
"github.com/stretchr/testify/mock"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
)

type MockClusterConnectionFactory struct {
mock.Mock
}

func (m *MockClusterConnectionFactory) NewForConfig(config *rest.Config) (*fake.Clientset, error) {
args := m.Called(config)
return args.Get(0).(*fake.Clientset), args.Error(1)
}
5 changes: 2 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,9 @@ func main() {
logrus.Fatalf("Failed to test cluster connection: %v", err)
}

nodeMonitorController := controllers.NewNodeMonitorController(clientset, logger)

controllersList := []controllers.Controller{
nodeMonitorController,
controllers.NewNodeMonitorController(clientset, logger),
controllers.NewPodMonitorController(clientset, logger),
}

for _, controller := range controllersList {
Expand Down
Loading

0 comments on commit 2a58e9d

Please sign in to comment.