Skip to content

Commit

Permalink
Merge branch 'master' into fixIfId
Browse files Browse the repository at this point in the history
  • Loading branch information
jiceatscion authored Jul 17, 2024
2 parents eea6875 + 8ba6b38 commit 09e388c
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 19 deletions.
53 changes: 41 additions & 12 deletions router/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"net"
"net/netip"
"sync"
"sync/atomic"
"time"
"unsafe"

Expand Down Expand Up @@ -184,7 +185,7 @@ type DataPlane struct {
bfdSessions map[uint16]bfdSession
localIA addr.IA
mtx sync.Mutex
running bool
running atomic.Bool
Metrics *Metrics
forwardingMetrics map[uint16]interfaceMetrics
dispatchedPortStart uint16
Expand Down Expand Up @@ -243,11 +244,39 @@ type drkeyProvider interface {
) (drkey.ASHostKey, error)
}

// setRunning() Configures the running state of the data plane to true. setRunning() is called once
// the dataplane is finished initializing and is ready to process packets.
func (d *DataPlane) setRunning() {
d.running.Store(true)
}

// setStopping() Configures the running state of the data plane to false. This should not be called
// during the dataplane initialization. Calling this before initialization starts has no effect.
func (d *DataPlane) setStopping() {
d.running.Store(false)
}

// IsRunning() Indicates the running state of the data plane. If true, the dataplane is initialized
// and ready to process or already processing packets. In this case some configuration changes are
// not permitted. If false, the data plane is not ready to process packets yet, or is shutting
// down.
func (d *DataPlane) IsRunning() bool {
return d.running.Load()
}

// Shutdown() causes the dataplane to stop accepting packets and then terminate. Note that
// in that case the router is committed to shutting down. There is no mechanism to restart it.
func (d *DataPlane) Shutdown() {
d.mtx.Lock() // make sure we're nor racing with initialization.
defer d.mtx.Unlock()
d.setStopping()
}

// SetIA sets the local IA for the dataplane.
func (d *DataPlane) SetIA(ia addr.IA) error {
d.mtx.Lock()
defer d.mtx.Unlock()
if d.running {
if d.IsRunning() {
return modifyExisting
}
if ia.IsZero() {
Expand All @@ -265,7 +294,7 @@ func (d *DataPlane) SetIA(ia addr.IA) error {
func (d *DataPlane) SetKey(key []byte) error {
d.mtx.Lock()
defer d.mtx.Unlock()
if d.running {
if d.IsRunning() {
return modifyExisting
}
if len(key) == 0 {
Expand Down Expand Up @@ -297,7 +326,7 @@ func (d *DataPlane) SetPortRange(start, end uint16) {
func (d *DataPlane) AddInternalInterface(conn BatchConn, ip netip.Addr) error {
d.mtx.Lock()
defer d.mtx.Unlock()
if d.running {
if d.IsRunning() {
return modifyExisting
}
if conn == nil {
Expand All @@ -324,7 +353,7 @@ func (d *DataPlane) AddExternalInterface(ifID uint16, conn BatchConn,
d.mtx.Lock()
defer d.mtx.Unlock()

if d.running {
if d.IsRunning() {
return modifyExisting
}
if conn == nil || !src.Addr.IsValid() || !dst.Addr.IsValid() {
Expand Down Expand Up @@ -354,7 +383,7 @@ func (d *DataPlane) AddExternalInterface(ifID uint16, conn BatchConn,
func (d *DataPlane) AddNeighborIA(ifID uint16, remote addr.IA) error {
d.mtx.Lock()
defer d.mtx.Unlock()
if d.running {
if d.IsRunning() {
return modifyExisting
}
if remote.IsZero() {
Expand Down Expand Up @@ -515,7 +544,7 @@ func (d *DataPlane) AddNextHop(ifID uint16, src, dst netip.AddrPort, cfg control
d.mtx.Lock()
defer d.mtx.Unlock()

if d.running {
if d.IsRunning() {
return modifyExisting
}
if !dst.IsValid() || !src.IsValid() {
Expand Down Expand Up @@ -585,7 +614,7 @@ type RunConfig struct {

func (d *DataPlane) Run(ctx context.Context, cfg *RunConfig) error {
d.mtx.Lock()
d.running = true
d.setRunning()
d.initMetrics()

processorQueueSize := max(
Expand Down Expand Up @@ -718,7 +747,7 @@ func (d *DataPlane) runReceiver(ifID uint16, conn BatchConn, cfg *RunConfig,
}
}

for d.running {
for d.IsRunning() {
// collect packets.

// Give a new buffer to the msgs elements that have been used in the previous loop.
Expand Down Expand Up @@ -778,7 +807,7 @@ func (d *DataPlane) runProcessor(id int, q <-chan *packet,

log.Debug("Initialize processor with", "id", id)
processor := newPacketProcessor(d)
for d.running {
for d.IsRunning() {
p, ok := <-q
if !ok {
continue
Expand Down Expand Up @@ -835,7 +864,7 @@ func (d *DataPlane) runSlowPathProcessor(id int, q <-chan *packet,

log.Debug("Initialize slow-path processor with", "id", id)
processor := newSlowPathProcessor(d)
for d.running {
for d.IsRunning() {
p, ok := <-q
if !ok {
continue
Expand Down Expand Up @@ -1012,7 +1041,7 @@ func (d *DataPlane) runForwarder(ifID uint16, conn BatchConn, cfg *RunConfig, c
metrics := d.forwardingMetrics[ifID]

toWrite := 0
for d.running {
for d.IsRunning() {
toWrite += readUpTo(c, cfg.BatchSize-toWrite, toWrite == 0, pkts[toWrite:])

// Turn the packets into underlay messages that WriteBatch can send.
Expand Down
12 changes: 6 additions & 6 deletions router/dataplane_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestReceiver(t *testing.T) {
).Times(2)
mInternal.EXPECT().ReadBatch(gomock.Any()).DoAndReturn(
func(m underlayconn.Messages) (int, error) {
dp.running = false
dp.setStopping()
done <- true
return 0, nil
},
Expand All @@ -86,7 +86,7 @@ func TestReceiver(t *testing.T) {
dp.initPacketPool(runConfig, 64)
procCh, _, _ := initQueues(runConfig, dp.interfaces, 64)
initialPoolSize := len(dp.packetPool)
dp.running = true
dp.setRunning()
dp.initMetrics()
go func() {
dp.runReceiver(0, dp.internal, runConfig, procCh)
Expand All @@ -109,7 +109,7 @@ func TestReceiver(t *testing.T) {
// make sure that the processing routine received exactly 20 messages
if i != 20 {
t.Fail()
dp.running = false
dp.setStopping()
}
}
}
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestForwarder(t *testing.T) {
}
}
if totalCount == 255 {
ret.running = false
ret.setStopping()
done <- struct{}{}
}
if len(ms) == 0 {
Expand All @@ -180,7 +180,7 @@ func TestForwarder(t *testing.T) {
dp.initPacketPool(runConfig, 64)
_, fwCh, _ := initQueues(runConfig, dp.interfaces, 64)
initialPoolSize := len(dp.packetPool)
dp.running = true
dp.setRunning()
dp.initMetrics()
go dp.runForwarder(0, dp.internal, runConfig, fwCh[0])

Expand Down Expand Up @@ -211,7 +211,7 @@ func TestForwarder(t *testing.T) {
assert.Equal(t, initialPoolSize, len(dp.packetPool))
case <-time.After(100 * time.Millisecond):
t.Fail()
dp.running = false
dp.setStopping()
}
}

Expand Down
2 changes: 1 addition & 1 deletion router/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func NewDP(
}

func (d *DataPlane) FakeStart() {
d.running = true
d.setRunning()
}

func (d *DataPlane) ProcessPkt(pkt *Packet) Disposition {
Expand Down

0 comments on commit 09e388c

Please sign in to comment.