Skip to content
Snippets Groups Projects
Unverified Commit 504add4c authored by Mohamed S. Mahmoud's avatar Mohamed S. Mahmoud Committed by GitHub
Browse files

use cached api to retrieve interface name to avoid highcpu (#251)

parent 3f14e8e3
No related branches found
No related tags found
No related merge requests found
......@@ -422,7 +422,7 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*fl
rbTracer.SendsTo(accounter)
if f.cfg.Deduper == DeduperFirstCome {
deduper := node.AsMiddle(flow.Dedupe(f.cfg.DeduperFCExpiry, f.cfg.DeduperJustMark, f.cfg.DeduperMerge),
deduper := node.AsMiddle(flow.Dedupe(f.cfg.DeduperFCExpiry, f.cfg.DeduperJustMark, f.cfg.DeduperMerge, f.interfaceNamer),
node.ChannelBufferLen(f.cfg.BuffersLength))
mapTracer.SendsTo(deduper)
accounter.SendsTo(deduper)
......
......@@ -8,7 +8,6 @@ import (
"github.com/sirupsen/logrus"
"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
)
var dlog = logrus.WithField("component", "flow/Deduper")
......@@ -40,7 +39,7 @@ type entry struct {
// (no activity for it during the expiration time)
// The justMark argument tells that the deduper should not drop the duplicate flows but
// set their Duplicate field.
func Dedupe(expireTime time.Duration, justMark, mergeDup bool) func(in <-chan []*Record, out chan<- []*Record) {
func Dedupe(expireTime time.Duration, justMark, mergeDup bool, ifaceNamer InterfaceNamer) func(in <-chan []*Record, out chan<- []*Record) {
cache := &deduperCache{
expire: expireTime,
entries: list.New(),
......@@ -51,7 +50,7 @@ func Dedupe(expireTime time.Duration, justMark, mergeDup bool) func(in <-chan []
cache.removeExpired()
fwd := make([]*Record, 0, len(records))
for _, record := range records {
cache.checkDupe(record, justMark, mergeDup, &fwd)
cache.checkDupe(record, justMark, mergeDup, &fwd, ifaceNamer)
}
if len(fwd) > 0 {
out <- fwd
......@@ -61,7 +60,7 @@ func Dedupe(expireTime time.Duration, justMark, mergeDup bool) func(in <-chan []
}
// checkDupe check current record if its already available nad if not added to fwd records list
func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Record) {
func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Record, ifaceNamer InterfaceNamer) {
mergeEntry := make(map[string]uint8)
rk := r.Id
// zeroes fields from key that should be ignored from the flow comparison
......@@ -95,7 +94,7 @@ func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Rec
*fwd = append(*fwd, r)
}
if mergeDup {
ifName := utils.GetInterfaceName(r.Id.IfIndex)
ifName := ifaceNamer(int(r.Id.IfIndex))
mergeEntry[ifName] = r.Id.Direction
if dupEntryNew(*fEntry.dupList, mergeEntry) {
*fEntry.dupList = append(*fEntry.dupList, mergeEntry)
......@@ -122,7 +121,7 @@ func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Rec
expiryTime: timeNow().Add(c.expire),
}
if mergeDup {
ifName := utils.GetInterfaceName(r.Id.IfIndex)
ifName := ifaceNamer(int(r.Id.IfIndex))
mergeEntry[ifName] = r.Id.Direction
r.DupList = append(r.DupList, mergeEntry)
e.dupList = &r.DupList
......
package flow
import (
"net"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
)
var (
......@@ -70,7 +70,7 @@ func TestDedupe(t *testing.T) {
input := make(chan []*Record, 100)
output := make(chan []*Record, 100)
go Dedupe(time.Minute, false, false)(input, output)
go Dedupe(time.Minute, false, false, interfaceNamer)(input, output)
input <- []*Record{
oneIf2, // record 1 at interface 2: should be accepted
......@@ -108,7 +108,7 @@ func TestDedupe_EvictFlows(t *testing.T) {
input := make(chan []*Record, 100)
output := make(chan []*Record, 100)
go Dedupe(15*time.Second, false, false)(input, output)
go Dedupe(15*time.Second, false, false, interfaceNamer)(input, output)
// Should only accept records 1 and 2, at interface 1
input <- []*Record{oneIf1, twoIf1, oneIf2}
......@@ -143,7 +143,7 @@ func TestDedupeMerge(t *testing.T) {
input := make(chan []*Record, 100)
output := make(chan []*Record, 100)
go Dedupe(time.Minute, false, true)(input, output)
go Dedupe(time.Minute, false, true, interfaceNamer)(input, output)
input <- []*Record{
oneIf2, // record 1 at interface 2: should be accepted
......@@ -155,10 +155,10 @@ func TestDedupeMerge(t *testing.T) {
expectedMap := []map[string]uint8{
{
utils.GetInterfaceName(oneIf2.Id.IfIndex): oneIf2.Id.Direction,
interfaceNamer(int(oneIf2.Id.IfIndex)): oneIf2.Id.Direction,
},
{
utils.GetInterfaceName(oneIf1.Id.IfIndex): oneIf1.Id.Direction,
interfaceNamer(int(oneIf1.Id.IfIndex)): oneIf1.Id.Direction,
},
}
......@@ -174,3 +174,11 @@ type timerMock struct {
func (tm *timerMock) Now() time.Time {
return tm.now
}
func interfaceNamer(ifIndex int) string {
iface, err := net.InterfaceByIndex(ifIndex)
if err != nil {
return "unknown"
}
return iface.Name
}
......@@ -91,11 +91,3 @@ func utsnameStr[T int8 | uint8](in []T) string {
}
return string(out)
}
func GetInterfaceName(ifIndex uint32) string {
iface, err := net.InterfaceByIndex(int(ifIndex))
if err != nil {
return "unknown"
}
return iface.Name
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment