Skip to content

Commit

Permalink
Add support to reconcile allocated Pod IPs.
Browse files Browse the repository at this point in the history
The CNI today only reconciles its datastore with existing pods at
startup but never again. Sometimes its possible that IPAMD goes
out of sync with the kubelet's view of the pods running on the
node if it fails or is temporarily unreachable by the CNI plugin
handling the DelNetwork call from the contrainer runtime.

In such cases the CNI continues to consider the pods IP allocated
and will not free it as it will never see a DelNetwork again. This
results in CNI failing to assign IP's to new pods.

This change adds a reconcile loop which periodically (once a minute)
reconciles its allocated IPs with existence of pod's veth devices. If
the veth device is not found then it free's up the corresponding
allocation making the IP available for reuse.

Fixes aws#3109
  • Loading branch information
Bhasker Hariharan committed Dec 4, 2024
1 parent 1b631d2 commit b8af90d
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 0 deletions.
3 changes: 3 additions & 0 deletions cmd/aws-k8s-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ func _main() int {
// Pool manager
go ipamContext.StartNodeIPPoolManager()

// Pod IP allocation reconcile loop to clear up dangling pod IPs.
go ipamContext.PodIPAllocationReconcileLoop()

if !utils.GetBoolAsStringEnvVar(envDisableMetrics, false) {
// Prometheus metrics
go metrics.ServeMetrics(metricsPort)
Expand Down
40 changes: 40 additions & 0 deletions pkg/ipamd/datastore/data_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,45 @@ func (ds *DataStore) normalizeCheckpointDataByPodVethExistence(checkpoint Checkp
return checkpoint, nil
}

// ValidateAssignedIPsByPodVethExistence validates all assigned IPs by checking the existence of the corresponding pod veth.
func (ds *DataStore) ValidateAssignedIPsByPodVethExistence() error {
ds.lock.Lock()
defer ds.lock.Unlock()

hostNSLinks, err := ds.netLink.LinkList()
if err != nil {
return err
}

var staleAllocations []CheckpointEntry
for _, eni := range ds.eniPool {
for _, assignedAddr := range eni.AvailableIPv4Cidrs {
for _, addr := range assignedAddr.IPAddresses {
if addr.Assigned() {
allocation := CheckpointEntry{
IPAMKey: addr.IPAMKey,
IPv4: addr.Address,
AllocationTimestamp: addr.AssignedTime.UnixNano(),
Metadata: addr.IPAMMetadata,
}
if err := ds.validateAllocationByPodVethExistence(allocation, hostNSLinks); err != nil {
ds.log.Warnf("stale IP allocation for ID(%v): IPv4(%v) due to %v", allocation.ContainerID, allocation.IPv4, err)
staleAllocations = append(staleAllocations, allocation)
ds.unassignPodIPAddressUnsafe(addr)
}
}
}
}
}

// Stale allocations may have dangling IP rules that need cleanup
if len(staleAllocations) > 0 {
ds.PruneStaleAllocations(staleAllocations)
}

return nil
}

func (ds *DataStore) validateAllocationByPodVethExistence(allocation CheckpointEntry, hostNSLinks []netlink.Link) error {
// for backwards compatibility, we skip the validation when metadata contains empty namespace/name.
if allocation.Metadata.K8SPodNamespace == "" || allocation.Metadata.K8SPodName == "" {
Expand All @@ -1466,6 +1505,7 @@ func (ds *DataStore) validateAllocationByPodVethExistence(allocation CheckpointE
linkNameSuffix := networkutils.GeneratePodHostVethNameSuffix(allocation.Metadata.K8SPodNamespace, allocation.Metadata.K8SPodName)
for _, link := range hostNSLinks {
linkName := link.Attrs().Name
fmt.Printf("linkName: %v, linkNameSuffix: %v\n", linkName, linkNameSuffix)
if strings.HasSuffix(linkName, linkNameSuffix) {
return nil
}
Expand Down
74 changes: 74 additions & 0 deletions pkg/ipamd/datastore/data_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1615,3 +1615,77 @@ func TestForceRemovalMetrics(t *testing.T) {
eniCount = testutil.ToFloat64(prometheusmetrics.ForceRemovedENIs)
assert.Equal(t, float64(1), eniCount)
}

func TestValidateAssignedIPsByPodVethExistence(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockNetLink := mock_netlinkwrapper.NewMockNetLink(ctrl)
ds := &DataStore{
eniPool: make(ENIPool),
log: Testlog,
netLink: mockNetLink,
backingStore: NullCheckpoint{},
ipCooldownPeriod: getCooldownPeriod(),
}

deviceName := "eni-" + networkutils.GeneratePodHostVethNameSuffix("default","sample-pod-1")
// Mock the netlink LinkList call
mockNetLink.EXPECT().LinkList().Return([]netlink.Link{
&netlink.Dummy{LinkAttrs: netlink.LinkAttrs{Name: deviceName}},
&netlink.Dummy{LinkAttrs: netlink.LinkAttrs{Name: "veth1"}},
}, nil)

// Add ENIs and IPs to the datastore
err := ds.AddENI("eni-1", 1, true, false, false)
assert.NoError(t, err)

ipv4Addr := net.IPNet{IP: net.ParseIP("1.1.1.1"), Mask: net.IPv4Mask(255, 255, 255, 255)}
err = ds.AddIPv4CidrToStore("eni-1", ipv4Addr, false)
assert.NoError(t, err)

key := IPAMKey{"net0", "sandbox-1", deviceName}
ipamMetadata := IPAMMetadata{K8SPodNamespace: "default", K8SPodName: "sample-pod-1"}
_, _, err = ds.AssignPodIPv4Address(key, ipamMetadata)
assert.NoError(t, err)

// Validate assigned IPs by pod veth existence
err = ds.ValidateAssignedIPsByPodVethExistence()
assert.NoError(t, err)

// Check if the IP is still assigned
eni, _, addr := ds.eniPool.FindAddressForSandbox(key)
assert.NotNil(t, addr)
assert.Equal(t, "1.1.1.1", addr.Address)
assert.Equal(t, eni.DeviceNumber, 1)

mockNetLink.EXPECT().NewRule().DoAndReturn(func() *netlink.Rule { return netlink.NewRule() }).Times(2)

// Mock the netlink RuleList call to simulate stale rules
toContainerRule := netlink.NewRule()
toContainerRule.Dst = &net.IPNet{IP: net.ParseIP("1.1.1.1"), Mask: net.CIDRMask(32, 32)}
toContainerRule.Priority = networkutils.ToContainerRulePriority
toContainerRule.Table = unix.RT_TABLE_MAIN

fromContainerRule := netlink.NewRule()
fromContainerRule.Src = &net.IPNet{IP: net.ParseIP("1.1.1.1"), Mask: net.CIDRMask(32, 32)}
fromContainerRule.Priority = networkutils.FromPodRulePriority
fromContainerRule.Table = unix.RT_TABLE_UNSPEC

// Mock the netlink RuleDel call to remove stale rules
mockNetLink.EXPECT().RuleDel(toContainerRule).Return(nil)
mockNetLink.EXPECT().RuleDel(fromContainerRule).Return(nil)
// Mock the netlink LinkList call, omit the device that is associated with
// the IP to simulate stale veth.
mockNetLink.EXPECT().LinkList().Return([]netlink.Link{
&netlink.Dummy{LinkAttrs: netlink.LinkAttrs{Name: "veth1"}},
}, nil)

// Validate assigned IPs by pod veth existence again
err = ds.ValidateAssignedIPsByPodVethExistence()
assert.NoError(t, err)

// Check if the IP is unassigned
_, _, addr = ds.eniPool.FindAddressForSandbox(key)
assert.Nil(t, addr)
}
14 changes: 14 additions & 0 deletions pkg/ipamd/ipamd.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ const (
nodeIPPoolReconcileInterval = 60 * time.Second
decreaseIPPoolInterval = 30 * time.Second

// podIPAllocationReconcileInterval is the interval at which the IPAM
// controller reconciles the IP address allocation for pods.
podIPAllocationReconcileInterval = 5 * time.Second

// ipReconcileCooldown is the amount of time that an IP address must wait until it can be added to the data store
// during reconciliation after being discovered on the EC2 instance metadata.
ipReconcileCooldown = 60 * time.Second
Expand Down Expand Up @@ -637,6 +641,16 @@ func (c *IPAMContext) updateIPStats(unmanaged int) {
prometheusmetrics.EnisMax.Set(float64(c.maxENI - unmanaged))
}

// reconcilePodIPAllocations reconciles IPAM's datastore's view of allocated
// IPs with the actual IPs assigned to pods.
func (c *IPAMContext) PodIPAllocationReconcileLoop() {
const sleepDuration = podIPAllocationReconcileInterval
for {
time.Sleep(sleepDuration)
c.dataStore.ValidateAssignedIPsByPodVethExistence()
}
}

// StartNodeIPPoolManager monitors the IP pool, add or del them when it is required.
func (c *IPAMContext) StartNodeIPPoolManager() {
// For IPv6, if Security Groups for Pods is enabled, wait until trunk ENI is attached and add it to the datastore.
Expand Down

0 comments on commit b8af90d

Please sign in to comment.