Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to reconcile allocated Pod IPs. #3113

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/aws-k8s-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ func _main() int {
// Pool manager
go ipamContext.StartNodeIPPoolManager()

// Pod IP allocation reconcile loop to clear up dangling pod IPs.
go ipamContext.PodIPAllocationReconcileLoop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an initial review, I am hesitant to add this additional 1 minute delay for the IP Sync in Reconcile Loop. I am not sure on the need for this. I will try understand the problem you encountered and see if there is any other approach to resolve this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what you mean by 1 minute delay. This is not adding to the reconcile loop that reconciles pod IP allocation against IPs assigned to the node but instead is just cleaning up pod IPs that may not be in-use anymore because the corresponding pod is gone. This is a separate goroutine that runs a loop that just does that and should not add any delay to the regular IP reconcile loop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, the need for this separate routine itself is introducing some concerns here. We will see why this is strictly needed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any update on this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping


if !utils.GetBoolAsStringEnvVar(envDisableMetrics, false) {
// Prometheus metrics
go metrics.ServeMetrics(metricsPort)
Expand Down
40 changes: 40 additions & 0 deletions pkg/ipamd/datastore/data_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,45 @@ func (ds *DataStore) normalizeCheckpointDataByPodVethExistence(checkpoint Checkp
return checkpoint, nil
}

// ValidateAssignedIPsByPodVethExistence validates all assigned IPs by checking the existence of the corresponding pod veth.
func (ds *DataStore) ValidateAssignedIPsByPodVethExistence() error {
ds.lock.Lock()
defer ds.lock.Unlock()

hostNSLinks, err := ds.netLink.LinkList()
if err != nil {
return err
}

var staleAllocations []CheckpointEntry
for _, eni := range ds.eniPool {
for _, assignedAddr := range eni.AvailableIPv4Cidrs {
for _, addr := range assignedAddr.IPAddresses {
if addr.Assigned() {
allocation := CheckpointEntry{
IPAMKey: addr.IPAMKey,
IPv4: addr.Address,
AllocationTimestamp: addr.AssignedTime.UnixNano(),
Metadata: addr.IPAMMetadata,
}
if err := ds.validateAllocationByPodVethExistence(allocation, hostNSLinks); err != nil {
ds.log.Warnf("stale IP allocation for ID(%v): IPv4(%v) due to %v", allocation.ContainerID, allocation.IPv4, err)
staleAllocations = append(staleAllocations, allocation)
ds.unassignPodIPAddressUnsafe(addr)
}
}
}
}
}

// Stale allocations may have dangling IP rules that need cleanup
if len(staleAllocations) > 0 {
ds.PruneStaleAllocations(staleAllocations)
}

return nil
}

func (ds *DataStore) validateAllocationByPodVethExistence(allocation CheckpointEntry, hostNSLinks []netlink.Link) error {
// for backwards compatibility, we skip the validation when metadata contains empty namespace/name.
if allocation.Metadata.K8SPodNamespace == "" || allocation.Metadata.K8SPodName == "" {
Expand All @@ -1466,6 +1505,7 @@ func (ds *DataStore) validateAllocationByPodVethExistence(allocation CheckpointE
linkNameSuffix := networkutils.GeneratePodHostVethNameSuffix(allocation.Metadata.K8SPodNamespace, allocation.Metadata.K8SPodName)
for _, link := range hostNSLinks {
linkName := link.Attrs().Name
fmt.Printf("linkName: %v, linkNameSuffix: %v\n", linkName, linkNameSuffix)
if strings.HasSuffix(linkName, linkNameSuffix) {
return nil
}
Expand Down
74 changes: 74 additions & 0 deletions pkg/ipamd/datastore/data_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1615,3 +1615,77 @@ func TestForceRemovalMetrics(t *testing.T) {
eniCount = testutil.ToFloat64(prometheusmetrics.ForceRemovedENIs)
assert.Equal(t, float64(1), eniCount)
}

func TestValidateAssignedIPsByPodVethExistence(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockNetLink := mock_netlinkwrapper.NewMockNetLink(ctrl)
ds := &DataStore{
eniPool: make(ENIPool),
log: Testlog,
netLink: mockNetLink,
backingStore: NullCheckpoint{},
ipCooldownPeriod: getCooldownPeriod(),
}

deviceName := "eni-" + networkutils.GeneratePodHostVethNameSuffix("default","sample-pod-1")
// Mock the netlink LinkList call
mockNetLink.EXPECT().LinkList().Return([]netlink.Link{
&netlink.Dummy{LinkAttrs: netlink.LinkAttrs{Name: deviceName}},
&netlink.Dummy{LinkAttrs: netlink.LinkAttrs{Name: "veth1"}},
}, nil)

// Add ENIs and IPs to the datastore
err := ds.AddENI("eni-1", 1, true, false, false)
assert.NoError(t, err)

ipv4Addr := net.IPNet{IP: net.ParseIP("1.1.1.1"), Mask: net.IPv4Mask(255, 255, 255, 255)}
err = ds.AddIPv4CidrToStore("eni-1", ipv4Addr, false)
assert.NoError(t, err)

key := IPAMKey{"net0", "sandbox-1", deviceName}
ipamMetadata := IPAMMetadata{K8SPodNamespace: "default", K8SPodName: "sample-pod-1"}
_, _, err = ds.AssignPodIPv4Address(key, ipamMetadata)
assert.NoError(t, err)

// Validate assigned IPs by pod veth existence
err = ds.ValidateAssignedIPsByPodVethExistence()
assert.NoError(t, err)

// Check if the IP is still assigned
eni, _, addr := ds.eniPool.FindAddressForSandbox(key)
assert.NotNil(t, addr)
assert.Equal(t, "1.1.1.1", addr.Address)
assert.Equal(t, eni.DeviceNumber, 1)

mockNetLink.EXPECT().NewRule().DoAndReturn(func() *netlink.Rule { return netlink.NewRule() }).Times(2)

// Mock the netlink RuleList call to simulate stale rules
toContainerRule := netlink.NewRule()
toContainerRule.Dst = &net.IPNet{IP: net.ParseIP("1.1.1.1"), Mask: net.CIDRMask(32, 32)}
toContainerRule.Priority = networkutils.ToContainerRulePriority
toContainerRule.Table = unix.RT_TABLE_MAIN

fromContainerRule := netlink.NewRule()
fromContainerRule.Src = &net.IPNet{IP: net.ParseIP("1.1.1.1"), Mask: net.CIDRMask(32, 32)}
fromContainerRule.Priority = networkutils.FromPodRulePriority
fromContainerRule.Table = unix.RT_TABLE_UNSPEC

// Mock the netlink RuleDel call to remove stale rules
mockNetLink.EXPECT().RuleDel(toContainerRule).Return(nil)
mockNetLink.EXPECT().RuleDel(fromContainerRule).Return(nil)
// Mock the netlink LinkList call, omit the device that is associated with
// the IP to simulate stale veth.
mockNetLink.EXPECT().LinkList().Return([]netlink.Link{
&netlink.Dummy{LinkAttrs: netlink.LinkAttrs{Name: "veth1"}},
}, nil)

// Validate assigned IPs by pod veth existence again
err = ds.ValidateAssignedIPsByPodVethExistence()
assert.NoError(t, err)

// Check if the IP is unassigned
_, _, addr = ds.eniPool.FindAddressForSandbox(key)
assert.Nil(t, addr)
}
14 changes: 14 additions & 0 deletions pkg/ipamd/ipamd.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ const (
nodeIPPoolReconcileInterval = 60 * time.Second
decreaseIPPoolInterval = 30 * time.Second

// podIPAllocationReconcileInterval is the interval at which the IPAM
// controller reconciles the IP address allocation for pods.
podIPAllocationReconcileInterval = 5 * time.Second

// ipReconcileCooldown is the amount of time that an IP address must wait until it can be added to the data store
// during reconciliation after being discovered on the EC2 instance metadata.
ipReconcileCooldown = 60 * time.Second
Expand Down Expand Up @@ -637,6 +641,16 @@ func (c *IPAMContext) updateIPStats(unmanaged int) {
prometheusmetrics.EnisMax.Set(float64(c.maxENI - unmanaged))
}

// reconcilePodIPAllocations reconciles IPAM's datastore's view of allocated
// IPs with the actual IPs assigned to pods.
func (c *IPAMContext) PodIPAllocationReconcileLoop() {
const sleepDuration = podIPAllocationReconcileInterval
for {
time.Sleep(sleepDuration)
c.dataStore.ValidateAssignedIPsByPodVethExistence()
}
}

// StartNodeIPPoolManager monitors the IP pool, add or del them when it is required.
func (c *IPAMContext) StartNodeIPPoolManager() {
// For IPv6, if Security Groups for Pods is enabled, wait until trunk ENI is attached and add it to the datastore.
Expand Down