diff --git a/cmd/aws-k8s-agent/main.go b/cmd/aws-k8s-agent/main.go index f379f552f6..22720304a8 100644 --- a/cmd/aws-k8s-agent/main.go +++ b/cmd/aws-k8s-agent/main.go @@ -77,6 +77,9 @@ func _main() int { // Pool manager go ipamContext.StartNodeIPPoolManager() + // Pod IP allocation reconcile loop to clear up dangling pod IPs. + go ipamContext.PodIPAllocationReconcileLoop() + if !utils.GetBoolAsStringEnvVar(envDisableMetrics, false) { // Prometheus metrics go metrics.ServeMetrics(metricsPort) diff --git a/pkg/ipamd/datastore/data_store.go b/pkg/ipamd/datastore/data_store.go index 02d5cd21f0..92d7162978 100644 --- a/pkg/ipamd/datastore/data_store.go +++ b/pkg/ipamd/datastore/data_store.go @@ -1457,6 +1457,45 @@ func (ds *DataStore) normalizeCheckpointDataByPodVethExistence(checkpoint Checkp return checkpoint, nil } +// ValidateAssignedIPsByPodVethExistence validates all assigned IPs by checking the existence of the corresponding pod veth. +func (ds *DataStore) ValidateAssignedIPsByPodVethExistence() error { + ds.lock.Lock() + defer ds.lock.Unlock() + + hostNSLinks, err := ds.netLink.LinkList() + if err != nil { + return err + } + + var staleAllocations []CheckpointEntry + for _, eni := range ds.eniPool { + for _, assignedAddr := range eni.AvailableIPv4Cidrs { + for _, addr := range assignedAddr.IPAddresses { + if addr.Assigned() { + allocation := CheckpointEntry{ + IPAMKey: addr.IPAMKey, + IPv4: addr.Address, + AllocationTimestamp: addr.AssignedTime.UnixNano(), + Metadata: addr.IPAMMetadata, + } + if err := ds.validateAllocationByPodVethExistence(allocation, hostNSLinks); err != nil { + ds.log.Warnf("stale IP allocation for ID(%v): IPv4(%v) due to %v", allocation.ContainerID, allocation.IPv4, err) + staleAllocations = append(staleAllocations, allocation) + ds.unassignPodIPAddressUnsafe(addr) + } + } + } + } + } + + // Stale allocations may have dangling IP rules that need cleanup + if len(staleAllocations) > 0 { + ds.PruneStaleAllocations(staleAllocations) + } + + return nil +} + func (ds *DataStore) validateAllocationByPodVethExistence(allocation CheckpointEntry, hostNSLinks []netlink.Link) error { // for backwards compatibility, we skip the validation when metadata contains empty namespace/name. if allocation.Metadata.K8SPodNamespace == "" || allocation.Metadata.K8SPodName == "" { @@ -1466,6 +1505,7 @@ func (ds *DataStore) validateAllocationByPodVethExistence(allocation CheckpointE linkNameSuffix := networkutils.GeneratePodHostVethNameSuffix(allocation.Metadata.K8SPodNamespace, allocation.Metadata.K8SPodName) for _, link := range hostNSLinks { linkName := link.Attrs().Name + fmt.Printf("linkName: %v, linkNameSuffix: %v\n", linkName, linkNameSuffix) if strings.HasSuffix(linkName, linkNameSuffix) { return nil } diff --git a/pkg/ipamd/datastore/data_store_test.go b/pkg/ipamd/datastore/data_store_test.go index 3efd872dbd..94ca2747c7 100644 --- a/pkg/ipamd/datastore/data_store_test.go +++ b/pkg/ipamd/datastore/data_store_test.go @@ -1615,3 +1615,77 @@ func TestForceRemovalMetrics(t *testing.T) { eniCount = testutil.ToFloat64(prometheusmetrics.ForceRemovedENIs) assert.Equal(t, float64(1), eniCount) } + +func TestValidateAssignedIPsByPodVethExistence(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockNetLink := mock_netlinkwrapper.NewMockNetLink(ctrl) + ds := &DataStore{ + eniPool: make(ENIPool), + log: Testlog, + netLink: mockNetLink, + backingStore: NullCheckpoint{}, + ipCooldownPeriod: getCooldownPeriod(), + } + + deviceName := "eni-" + networkutils.GeneratePodHostVethNameSuffix("default","sample-pod-1") + // Mock the netlink LinkList call + mockNetLink.EXPECT().LinkList().Return([]netlink.Link{ + &netlink.Dummy{LinkAttrs: netlink.LinkAttrs{Name: deviceName}}, + &netlink.Dummy{LinkAttrs: netlink.LinkAttrs{Name: "veth1"}}, + }, nil) + + // Add ENIs and IPs to the datastore + err := ds.AddENI("eni-1", 1, true, false, false) + assert.NoError(t, err) + + ipv4Addr := net.IPNet{IP: net.ParseIP("1.1.1.1"), Mask: net.IPv4Mask(255, 255, 255, 255)} + err = ds.AddIPv4CidrToStore("eni-1", ipv4Addr, false) + assert.NoError(t, err) + + key := IPAMKey{"net0", "sandbox-1", deviceName} + ipamMetadata := IPAMMetadata{K8SPodNamespace: "default", K8SPodName: "sample-pod-1"} + _, _, err = ds.AssignPodIPv4Address(key, ipamMetadata) + assert.NoError(t, err) + + // Validate assigned IPs by pod veth existence + err = ds.ValidateAssignedIPsByPodVethExistence() + assert.NoError(t, err) + + // Check if the IP is still assigned + eni, _, addr := ds.eniPool.FindAddressForSandbox(key) + assert.NotNil(t, addr) + assert.Equal(t, "1.1.1.1", addr.Address) + assert.Equal(t, eni.DeviceNumber, 1) + + mockNetLink.EXPECT().NewRule().DoAndReturn(func() *netlink.Rule { return netlink.NewRule() }).Times(2) + + // Mock the netlink RuleList call to simulate stale rules + toContainerRule := netlink.NewRule() + toContainerRule.Dst = &net.IPNet{IP: net.ParseIP("1.1.1.1"), Mask: net.CIDRMask(32, 32)} + toContainerRule.Priority = networkutils.ToContainerRulePriority + toContainerRule.Table = unix.RT_TABLE_MAIN + + fromContainerRule := netlink.NewRule() + fromContainerRule.Src = &net.IPNet{IP: net.ParseIP("1.1.1.1"), Mask: net.CIDRMask(32, 32)} + fromContainerRule.Priority = networkutils.FromPodRulePriority + fromContainerRule.Table = unix.RT_TABLE_UNSPEC + + // Mock the netlink RuleDel call to remove stale rules + mockNetLink.EXPECT().RuleDel(toContainerRule).Return(nil) + mockNetLink.EXPECT().RuleDel(fromContainerRule).Return(nil) + // Mock the netlink LinkList call, omit the device that is associated with + // the IP to simulate stale veth. + mockNetLink.EXPECT().LinkList().Return([]netlink.Link{ + &netlink.Dummy{LinkAttrs: netlink.LinkAttrs{Name: "veth1"}}, + }, nil) + + // Validate assigned IPs by pod veth existence again + err = ds.ValidateAssignedIPsByPodVethExistence() + assert.NoError(t, err) + + // Check if the IP is unassigned + _, _, addr = ds.eniPool.FindAddressForSandbox(key) + assert.Nil(t, addr) +} \ No newline at end of file diff --git a/pkg/ipamd/ipamd.go b/pkg/ipamd/ipamd.go index 588bc3870a..ac6850019e 100644 --- a/pkg/ipamd/ipamd.go +++ b/pkg/ipamd/ipamd.go @@ -61,6 +61,10 @@ const ( nodeIPPoolReconcileInterval = 60 * time.Second decreaseIPPoolInterval = 30 * time.Second + // podIPAllocationReconcileInterval is the interval at which the IPAM + // controller reconciles the IP address allocation for pods. + podIPAllocationReconcileInterval = 5 * time.Second + // ipReconcileCooldown is the amount of time that an IP address must wait until it can be added to the data store // during reconciliation after being discovered on the EC2 instance metadata. ipReconcileCooldown = 60 * time.Second @@ -637,6 +641,16 @@ func (c *IPAMContext) updateIPStats(unmanaged int) { prometheusmetrics.EnisMax.Set(float64(c.maxENI - unmanaged)) } +// reconcilePodIPAllocations reconciles IPAM's datastore's view of allocated +// IPs with the actual IPs assigned to pods. +func (c *IPAMContext) PodIPAllocationReconcileLoop() { + const sleepDuration = podIPAllocationReconcileInterval + for { + time.Sleep(sleepDuration) + c.dataStore.ValidateAssignedIPsByPodVethExistence() + } +} + // StartNodeIPPoolManager monitors the IP pool, add or del them when it is required. func (c *IPAMContext) StartNodeIPPoolManager() { // For IPv6, if Security Groups for Pods is enabled, wait until trunk ENI is attached and add it to the datastore.