Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

router: implement a proper shutdown mechanism #4578

Merged
merged 3 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 41 additions & 12 deletions router/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"net"
"net/netip"
"sync"
"sync/atomic"
"time"
"unsafe"

Expand Down Expand Up @@ -184,7 +185,7 @@ type DataPlane struct {
bfdSessions map[uint16]bfdSession
localIA addr.IA
mtx sync.Mutex
running bool
running atomic.Bool
Metrics *Metrics
forwardingMetrics map[uint16]interfaceMetrics
dispatchedPortStart uint16
Expand Down Expand Up @@ -243,11 +244,39 @@ type drkeyProvider interface {
) (drkey.ASHostKey, error)
}

// setRunning() Configures the running state of the data plane to true. setRunning() is called once
// the dataplane is finished initializing and is ready to process packets.
func (d *DataPlane) setRunning() {
d.running.Store(true)
}

// setStopping() Configures the running state of the data plane to false. This should not be called
// during the dataplane initialization. Calling this before initialization starts has no effect.
func (d *DataPlane) setStopping() {
d.running.Store(false)
}

// IsRunning() Indicates the running state of the data plane. If true, the dataplane is initialized
// and ready to process or already processing packets. In this case some configuration changes are
// not permitted. If false, the data plane is not ready to process packets yet, or is shutting
// down.
func (d *DataPlane) IsRunning() bool {
return d.running.Load()
}

// Shutdown() causes the dataplane to stop accepting packets and then terminate. Note that
// in that case the router is committed to shutting down. There is no mechanism to restart it.
func (d *DataPlane) Shutdown() {
d.mtx.Lock() // make sure we're nor racing with initialization.
defer d.mtx.Unlock()
d.setStopping()
}

// SetIA sets the local IA for the dataplane.
func (d *DataPlane) SetIA(ia addr.IA) error {
d.mtx.Lock()
defer d.mtx.Unlock()
if d.running {
if d.IsRunning() {
return modifyExisting
}
if ia.IsZero() {
Expand All @@ -265,7 +294,7 @@ func (d *DataPlane) SetIA(ia addr.IA) error {
func (d *DataPlane) SetKey(key []byte) error {
d.mtx.Lock()
defer d.mtx.Unlock()
if d.running {
if d.IsRunning() {
return modifyExisting
}
if len(key) == 0 {
Expand Down Expand Up @@ -297,7 +326,7 @@ func (d *DataPlane) SetPortRange(start, end uint16) {
func (d *DataPlane) AddInternalInterface(conn BatchConn, ip netip.Addr) error {
d.mtx.Lock()
defer d.mtx.Unlock()
if d.running {
if d.IsRunning() {
return modifyExisting
}
if conn == nil {
Expand All @@ -324,7 +353,7 @@ func (d *DataPlane) AddExternalInterface(ifID uint16, conn BatchConn,
d.mtx.Lock()
defer d.mtx.Unlock()

if d.running {
if d.IsRunning() {
return modifyExisting
}
if conn == nil || !src.Addr.IsValid() || !dst.Addr.IsValid() {
Expand Down Expand Up @@ -354,7 +383,7 @@ func (d *DataPlane) AddExternalInterface(ifID uint16, conn BatchConn,
func (d *DataPlane) AddNeighborIA(ifID uint16, remote addr.IA) error {
d.mtx.Lock()
defer d.mtx.Unlock()
if d.running {
if d.IsRunning() {
return modifyExisting
}
if remote.IsZero() {
Expand Down Expand Up @@ -515,7 +544,7 @@ func (d *DataPlane) AddNextHop(ifID uint16, src, dst netip.AddrPort, cfg control
d.mtx.Lock()
defer d.mtx.Unlock()

if d.running {
if d.IsRunning() {
return modifyExisting
}
if !dst.IsValid() || !src.IsValid() {
Expand Down Expand Up @@ -585,7 +614,7 @@ type RunConfig struct {

func (d *DataPlane) Run(ctx context.Context, cfg *RunConfig) error {
d.mtx.Lock()
d.running = true
d.setRunning()
d.initMetrics()

processorQueueSize := max(
Expand Down Expand Up @@ -718,7 +747,7 @@ func (d *DataPlane) runReceiver(ifID uint16, conn BatchConn, cfg *RunConfig,
}
}

for d.running {
for d.IsRunning() {
// collect packets.

// Give a new buffer to the msgs elements that have been used in the previous loop.
Expand Down Expand Up @@ -778,7 +807,7 @@ func (d *DataPlane) runProcessor(id int, q <-chan *packet,

log.Debug("Initialize processor with", "id", id)
processor := newPacketProcessor(d)
for d.running {
for d.IsRunning() {
p, ok := <-q
if !ok {
continue
Expand Down Expand Up @@ -835,7 +864,7 @@ func (d *DataPlane) runSlowPathProcessor(id int, q <-chan *packet,

log.Debug("Initialize slow-path processor with", "id", id)
processor := newSlowPathProcessor(d)
for d.running {
for d.IsRunning() {
p, ok := <-q
if !ok {
continue
Expand Down Expand Up @@ -1012,7 +1041,7 @@ func (d *DataPlane) runForwarder(ifID uint16, conn BatchConn, cfg *RunConfig, c
metrics := d.forwardingMetrics[ifID]

toWrite := 0
for d.running {
for d.IsRunning() {
toWrite += readUpTo(c, cfg.BatchSize-toWrite, toWrite == 0, pkts[toWrite:])

// Turn the packets into underlay messages that WriteBatch can send.
Expand Down
12 changes: 6 additions & 6 deletions router/dataplane_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestReceiver(t *testing.T) {
).Times(2)
mInternal.EXPECT().ReadBatch(gomock.Any()).DoAndReturn(
func(m underlayconn.Messages) (int, error) {
dp.running = false
dp.setStopping()
done <- true
return 0, nil
},
Expand All @@ -86,7 +86,7 @@ func TestReceiver(t *testing.T) {
dp.initPacketPool(runConfig, 64)
procCh, _, _ := initQueues(runConfig, dp.interfaces, 64)
initialPoolSize := len(dp.packetPool)
dp.running = true
dp.setRunning()
dp.initMetrics()
go func() {
dp.runReceiver(0, dp.internal, runConfig, procCh)
Expand All @@ -109,7 +109,7 @@ func TestReceiver(t *testing.T) {
// make sure that the processing routine received exactly 20 messages
if i != 20 {
t.Fail()
dp.running = false
dp.setStopping()
}
}
}
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestForwarder(t *testing.T) {
}
}
if totalCount == 255 {
ret.running = false
ret.setStopping()
done <- struct{}{}
}
if len(ms) == 0 {
Expand All @@ -180,7 +180,7 @@ func TestForwarder(t *testing.T) {
dp.initPacketPool(runConfig, 64)
_, fwCh, _ := initQueues(runConfig, dp.interfaces, 64)
initialPoolSize := len(dp.packetPool)
dp.running = true
dp.setRunning()
dp.initMetrics()
go dp.runForwarder(0, dp.internal, runConfig, fwCh[0])

Expand Down Expand Up @@ -211,7 +211,7 @@ func TestForwarder(t *testing.T) {
assert.Equal(t, initialPoolSize, len(dp.packetPool))
case <-time.After(100 * time.Millisecond):
t.Fail()
dp.running = false
dp.setStopping()
}
}

Expand Down
2 changes: 1 addition & 1 deletion router/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func NewDP(
}

func (d *DataPlane) FakeStart() {
d.running = true
d.setRunning()
}

func (d *DataPlane) ProcessPkt(pkt *Packet) Disposition {
Expand Down
Loading