From 55a0925f59af7bf322606459d167eac086d7a660 Mon Sep 17 00:00:00 2001 From: Jean-Christophe Hugly Date: Tue, 16 Jul 2024 18:07:13 +0200 Subject: [PATCH] Implement a properly synchronized shutdown flag (and an entry point to set it). --- router/dataplane.go | 53 ++++++++++++++++++++++++------- router/dataplane_internal_test.go | 12 +++---- router/export_test.go | 2 +- 3 files changed, 48 insertions(+), 19 deletions(-) diff --git a/router/dataplane.go b/router/dataplane.go index 2fac11ac1c..8617230efa 100644 --- a/router/dataplane.go +++ b/router/dataplane.go @@ -27,6 +27,7 @@ import ( "net" "net/netip" "sync" + "sync/atomic" "time" "unsafe" @@ -184,7 +185,7 @@ type DataPlane struct { bfdSessions map[uint16]bfdSession localIA addr.IA mtx sync.Mutex - running bool + running atomic.Bool Metrics *Metrics forwardingMetrics map[uint16]interfaceMetrics dispatchedPortStart uint16 @@ -243,11 +244,39 @@ type drkeyProvider interface { ) (drkey.ASHostKey, error) } +// setRunning() Configures the running state of the data plane to true. setRunning() is called once +// the dataplane is finished initializing and is ready to process packets. +func (d *DataPlane) setRunning() { + d.running.Store(true) +} + +// setStopping() Configures the running state of the data plane to false. This should not be called +// during the dataplane initialization. Calling this before initialization starts has no effect. +func (d *DataPlane) setStopping() { + d.running.Store(false) +} + +// IsRunning() Indicates the running state of the data plane. If true, the dataplane is initialized +// and ready to process or already processing packets. In this case some configuration changes are +// not permitted. If false, the data plane is not ready to process packets yet, or is shutting +// down. +func (d *DataPlane) IsRunning() bool { + return d.running.Load() +} + +// Shutdown() causes the dataplane to stop accepting packets and then terminate. Note that +// in that case the router is committed to shutting down. There is no mechanism to restart it. +func (d *DataPlane) Shutdown() { + d.mtx.Lock() // make sure we're nor racing with initialization. + defer d.mtx.Unlock() + d.setStopping() +} + // SetIA sets the local IA for the dataplane. func (d *DataPlane) SetIA(ia addr.IA) error { d.mtx.Lock() defer d.mtx.Unlock() - if d.running { + if d.IsRunning() { return modifyExisting } if ia.IsZero() { @@ -265,7 +294,7 @@ func (d *DataPlane) SetIA(ia addr.IA) error { func (d *DataPlane) SetKey(key []byte) error { d.mtx.Lock() defer d.mtx.Unlock() - if d.running { + if d.IsRunning() { return modifyExisting } if len(key) == 0 { @@ -297,7 +326,7 @@ func (d *DataPlane) SetPortRange(start, end uint16) { func (d *DataPlane) AddInternalInterface(conn BatchConn, ip netip.Addr) error { d.mtx.Lock() defer d.mtx.Unlock() - if d.running { + if d.IsRunning() { return modifyExisting } if conn == nil { @@ -324,7 +353,7 @@ func (d *DataPlane) AddExternalInterface(ifID uint16, conn BatchConn, d.mtx.Lock() defer d.mtx.Unlock() - if d.running { + if d.IsRunning() { return modifyExisting } if conn == nil || !src.Addr.IsValid() || !dst.Addr.IsValid() { @@ -354,7 +383,7 @@ func (d *DataPlane) AddExternalInterface(ifID uint16, conn BatchConn, func (d *DataPlane) AddNeighborIA(ifID uint16, remote addr.IA) error { d.mtx.Lock() defer d.mtx.Unlock() - if d.running { + if d.IsRunning() { return modifyExisting } if remote.IsZero() { @@ -515,7 +544,7 @@ func (d *DataPlane) AddNextHop(ifID uint16, src, dst netip.AddrPort, cfg control d.mtx.Lock() defer d.mtx.Unlock() - if d.running { + if d.IsRunning() { return modifyExisting } if !dst.IsValid() || !src.IsValid() { @@ -585,7 +614,7 @@ type RunConfig struct { func (d *DataPlane) Run(ctx context.Context, cfg *RunConfig) error { d.mtx.Lock() - d.running = true + d.setRunning() d.initMetrics() processorQueueSize := max( @@ -718,7 +747,7 @@ func (d *DataPlane) runReceiver(ifID uint16, conn BatchConn, cfg *RunConfig, } } - for d.running { + for d.IsRunning() { // collect packets. // Give a new buffer to the msgs elements that have been used in the previous loop. @@ -778,7 +807,7 @@ func (d *DataPlane) runProcessor(id int, q <-chan *packet, log.Debug("Initialize processor with", "id", id) processor := newPacketProcessor(d) - for d.running { + for d.IsRunning() { p, ok := <-q if !ok { continue @@ -835,7 +864,7 @@ func (d *DataPlane) runSlowPathProcessor(id int, q <-chan *packet, log.Debug("Initialize slow-path processor with", "id", id) processor := newSlowPathProcessor(d) - for d.running { + for d.IsRunning() { p, ok := <-q if !ok { continue @@ -1012,7 +1041,7 @@ func (d *DataPlane) runForwarder(ifID uint16, conn BatchConn, cfg *RunConfig, c metrics := d.forwardingMetrics[ifID] toWrite := 0 - for d.running { + for d.IsRunning() { toWrite += readUpTo(c, cfg.BatchSize-toWrite, toWrite == 0, pkts[toWrite:]) // Turn the packets into underlay messages that WriteBatch can send. diff --git a/router/dataplane_internal_test.go b/router/dataplane_internal_test.go index 6c96b149bb..5c02410a87 100644 --- a/router/dataplane_internal_test.go +++ b/router/dataplane_internal_test.go @@ -71,7 +71,7 @@ func TestReceiver(t *testing.T) { ).Times(2) mInternal.EXPECT().ReadBatch(gomock.Any()).DoAndReturn( func(m underlayconn.Messages) (int, error) { - dp.running = false + dp.setStopping() done <- true return 0, nil }, @@ -86,7 +86,7 @@ func TestReceiver(t *testing.T) { dp.initPacketPool(runConfig, 64) procCh, _, _ := initQueues(runConfig, dp.interfaces, 64) initialPoolSize := len(dp.packetPool) - dp.running = true + dp.setRunning() dp.initMetrics() go func() { dp.runReceiver(0, dp.internal, runConfig, procCh) @@ -109,7 +109,7 @@ func TestReceiver(t *testing.T) { // make sure that the processing routine received exactly 20 messages if i != 20 { t.Fail() - dp.running = false + dp.setStopping() } } } @@ -160,7 +160,7 @@ func TestForwarder(t *testing.T) { } } if totalCount == 255 { - ret.running = false + ret.setStopping() done <- struct{}{} } if len(ms) == 0 { @@ -180,7 +180,7 @@ func TestForwarder(t *testing.T) { dp.initPacketPool(runConfig, 64) _, fwCh, _ := initQueues(runConfig, dp.interfaces, 64) initialPoolSize := len(dp.packetPool) - dp.running = true + dp.setRunning() dp.initMetrics() go dp.runForwarder(0, dp.internal, runConfig, fwCh[0]) @@ -211,7 +211,7 @@ func TestForwarder(t *testing.T) { assert.Equal(t, initialPoolSize, len(dp.packetPool)) case <-time.After(100 * time.Millisecond): t.Fail() - dp.running = false + dp.setStopping() } } diff --git a/router/export_test.go b/router/export_test.go index b175754402..e5f4cc0d18 100644 --- a/router/export_test.go +++ b/router/export_test.go @@ -96,7 +96,7 @@ func NewDP( } func (d *DataPlane) FakeStart() { - d.running = true + d.setRunning() } func (d *DataPlane) ProcessPkt(pkt *Packet) Disposition {