From 9d4cc1d03320b0e42cbe7b805cfca0d1ba5a2d99 Mon Sep 17 00:00:00 2001 From: Bhasker Hariharan Date: Wed, 13 Nov 2024 17:44:18 +0000 Subject: [PATCH] Add support to reconcile allocated Pod IPs. The CNI today only reconciles its datastore with existing pods at startup but never again. Sometimes its possible that IPAMD goes out of sync with the kubelet's view of the pods running on the node if it fails or is temporarily unreachable by the CNI plugin handling the DelNetwork call from the contrainer runtime. In such cases the CNI continues to consider the pods IP allocated and will not free it as it will never see a DelNetwork again. This results in CNI failing to assign IP's to new pods. This change adds a reconcile loop which periodically (once a minute) reconciles its allocated IPs with existence of pod's veth devices. If the veth device is not found then it free's up the corresponding allocation making the IP available for reuse. Fixes #3109 --- cmd/aws-k8s-agent/main.go | 3 ++ pkg/ipamd/datastore/data_store.go | 40 ++++++++++++++ pkg/ipamd/datastore/data_store_test.go | 74 ++++++++++++++++++++++++++ pkg/ipamd/ipamd.go | 14 +++++ 4 files changed, 131 insertions(+) diff --git a/cmd/aws-k8s-agent/main.go b/cmd/aws-k8s-agent/main.go index f379f552f6..22720304a8 100644 --- a/cmd/aws-k8s-agent/main.go +++ b/cmd/aws-k8s-agent/main.go @@ -77,6 +77,9 @@ func _main() int { // Pool manager go ipamContext.StartNodeIPPoolManager() + // Pod IP allocation reconcile loop to clear up dangling pod IPs. + go ipamContext.PodIPAllocationReconcileLoop() + if !utils.GetBoolAsStringEnvVar(envDisableMetrics, false) { // Prometheus metrics go metrics.ServeMetrics(metricsPort) diff --git a/pkg/ipamd/datastore/data_store.go b/pkg/ipamd/datastore/data_store.go index 02d5cd21f0..92d7162978 100644 --- a/pkg/ipamd/datastore/data_store.go +++ b/pkg/ipamd/datastore/data_store.go @@ -1457,6 +1457,45 @@ func (ds *DataStore) normalizeCheckpointDataByPodVethExistence(checkpoint Checkp return checkpoint, nil } +// ValidateAssignedIPsByPodVethExistence validates all assigned IPs by checking the existence of the corresponding pod veth. +func (ds *DataStore) ValidateAssignedIPsByPodVethExistence() error { + ds.lock.Lock() + defer ds.lock.Unlock() + + hostNSLinks, err := ds.netLink.LinkList() + if err != nil { + return err + } + + var staleAllocations []CheckpointEntry + for _, eni := range ds.eniPool { + for _, assignedAddr := range eni.AvailableIPv4Cidrs { + for _, addr := range assignedAddr.IPAddresses { + if addr.Assigned() { + allocation := CheckpointEntry{ + IPAMKey: addr.IPAMKey, + IPv4: addr.Address, + AllocationTimestamp: addr.AssignedTime.UnixNano(), + Metadata: addr.IPAMMetadata, + } + if err := ds.validateAllocationByPodVethExistence(allocation, hostNSLinks); err != nil { + ds.log.Warnf("stale IP allocation for ID(%v): IPv4(%v) due to %v", allocation.ContainerID, allocation.IPv4, err) + staleAllocations = append(staleAllocations, allocation) + ds.unassignPodIPAddressUnsafe(addr) + } + } + } + } + } + + // Stale allocations may have dangling IP rules that need cleanup + if len(staleAllocations) > 0 { + ds.PruneStaleAllocations(staleAllocations) + } + + return nil +} + func (ds *DataStore) validateAllocationByPodVethExistence(allocation CheckpointEntry, hostNSLinks []netlink.Link) error { // for backwards compatibility, we skip the validation when metadata contains empty namespace/name. if allocation.Metadata.K8SPodNamespace == "" || allocation.Metadata.K8SPodName == "" { @@ -1466,6 +1505,7 @@ func (ds *DataStore) validateAllocationByPodVethExistence(allocation CheckpointE linkNameSuffix := networkutils.GeneratePodHostVethNameSuffix(allocation.Metadata.K8SPodNamespace, allocation.Metadata.K8SPodName) for _, link := range hostNSLinks { linkName := link.Attrs().Name + fmt.Printf("linkName: %v, linkNameSuffix: %v\n", linkName, linkNameSuffix) if strings.HasSuffix(linkName, linkNameSuffix) { return nil } diff --git a/pkg/ipamd/datastore/data_store_test.go b/pkg/ipamd/datastore/data_store_test.go index 3efd872dbd..94ca2747c7 100644 --- a/pkg/ipamd/datastore/data_store_test.go +++ b/pkg/ipamd/datastore/data_store_test.go @@ -1615,3 +1615,77 @@ func TestForceRemovalMetrics(t *testing.T) { eniCount = testutil.ToFloat64(prometheusmetrics.ForceRemovedENIs) assert.Equal(t, float64(1), eniCount) } + +func TestValidateAssignedIPsByPodVethExistence(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockNetLink := mock_netlinkwrapper.NewMockNetLink(ctrl) + ds := &DataStore{ + eniPool: make(ENIPool), + log: Testlog, + netLink: mockNetLink, + backingStore: NullCheckpoint{}, + ipCooldownPeriod: getCooldownPeriod(), + } + + deviceName := "eni-" + networkutils.GeneratePodHostVethNameSuffix("default","sample-pod-1") + // Mock the netlink LinkList call + mockNetLink.EXPECT().LinkList().Return([]netlink.Link{ + &netlink.Dummy{LinkAttrs: netlink.LinkAttrs{Name: deviceName}}, + &netlink.Dummy{LinkAttrs: netlink.LinkAttrs{Name: "veth1"}}, + }, nil) + + // Add ENIs and IPs to the datastore + err := ds.AddENI("eni-1", 1, true, false, false) + assert.NoError(t, err) + + ipv4Addr := net.IPNet{IP: net.ParseIP("1.1.1.1"), Mask: net.IPv4Mask(255, 255, 255, 255)} + err = ds.AddIPv4CidrToStore("eni-1", ipv4Addr, false) + assert.NoError(t, err) + + key := IPAMKey{"net0", "sandbox-1", deviceName} + ipamMetadata := IPAMMetadata{K8SPodNamespace: "default", K8SPodName: "sample-pod-1"} + _, _, err = ds.AssignPodIPv4Address(key, ipamMetadata) + assert.NoError(t, err) + + // Validate assigned IPs by pod veth existence + err = ds.ValidateAssignedIPsByPodVethExistence() + assert.NoError(t, err) + + // Check if the IP is still assigned + eni, _, addr := ds.eniPool.FindAddressForSandbox(key) + assert.NotNil(t, addr) + assert.Equal(t, "1.1.1.1", addr.Address) + assert.Equal(t, eni.DeviceNumber, 1) + + mockNetLink.EXPECT().NewRule().DoAndReturn(func() *netlink.Rule { return netlink.NewRule() }).Times(2) + + // Mock the netlink RuleList call to simulate stale rules + toContainerRule := netlink.NewRule() + toContainerRule.Dst = &net.IPNet{IP: net.ParseIP("1.1.1.1"), Mask: net.CIDRMask(32, 32)} + toContainerRule.Priority = networkutils.ToContainerRulePriority + toContainerRule.Table = unix.RT_TABLE_MAIN + + fromContainerRule := netlink.NewRule() + fromContainerRule.Src = &net.IPNet{IP: net.ParseIP("1.1.1.1"), Mask: net.CIDRMask(32, 32)} + fromContainerRule.Priority = networkutils.FromPodRulePriority + fromContainerRule.Table = unix.RT_TABLE_UNSPEC + + // Mock the netlink RuleDel call to remove stale rules + mockNetLink.EXPECT().RuleDel(toContainerRule).Return(nil) + mockNetLink.EXPECT().RuleDel(fromContainerRule).Return(nil) + // Mock the netlink LinkList call, omit the device that is associated with + // the IP to simulate stale veth. + mockNetLink.EXPECT().LinkList().Return([]netlink.Link{ + &netlink.Dummy{LinkAttrs: netlink.LinkAttrs{Name: "veth1"}}, + }, nil) + + // Validate assigned IPs by pod veth existence again + err = ds.ValidateAssignedIPsByPodVethExistence() + assert.NoError(t, err) + + // Check if the IP is unassigned + _, _, addr = ds.eniPool.FindAddressForSandbox(key) + assert.Nil(t, addr) +} \ No newline at end of file diff --git a/pkg/ipamd/ipamd.go b/pkg/ipamd/ipamd.go index 588bc3870a..cbd660ed57 100644 --- a/pkg/ipamd/ipamd.go +++ b/pkg/ipamd/ipamd.go @@ -61,6 +61,10 @@ const ( nodeIPPoolReconcileInterval = 60 * time.Second decreaseIPPoolInterval = 30 * time.Second + // podIPAllocationReconcileInterval is the interval at which the IPAM + // controller reconciles the IP address allocation for pods. + podIPAllocationReconcileInterval = 1 * time.Minute + // ipReconcileCooldown is the amount of time that an IP address must wait until it can be added to the data store // during reconciliation after being discovered on the EC2 instance metadata. ipReconcileCooldown = 60 * time.Second @@ -637,6 +641,16 @@ func (c *IPAMContext) updateIPStats(unmanaged int) { prometheusmetrics.EnisMax.Set(float64(c.maxENI - unmanaged)) } +// reconcilePodIPAllocations reconciles IPAM's datastore's view of allocated +// IPs with the actual IPs assigned to pods. +func (c *IPAMContext) PodIPAllocationReconcileLoop() { + const sleepDuration = podIPAllocationReconcileInterval + for { + time.Sleep(sleepDuration) + c.dataStore.ValidateAssignedIPsByPodVethExistence() + } +} + // StartNodeIPPoolManager monitors the IP pool, add or del them when it is required. func (c *IPAMContext) StartNodeIPPoolManager() { // For IPv6, if Security Groups for Pods is enabled, wait until trunk ENI is attached and add it to the datastore.