diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 0c645287439c3371bca52c236ef1547d38caf76e..c467ad0303b4baea4d8bdb8233cb58319c65e967 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -296,9 +296,9 @@ func flowsAgent(cfg *Config, m *metrics.Metrics, samplingGauge := m.CreateSamplingRate() samplingGauge.Set(float64(cfg.Sampling)) - mapTracer := flow.NewMapTracer(fetcher, cfg.CacheActiveTimeout, cfg.StaleEntriesEvictTimeout, m, s) + mapTracer := flow.NewMapTracer(fetcher, cfg.CacheActiveTimeout, cfg.StaleEntriesEvictTimeout, m, s, cfg.EnableUDNMapping) rbTracer := flow.NewRingBufTracer(fetcher, mapTracer, cfg.CacheActiveTimeout, m) - accounter := flow.NewAccounter(cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now, m, s) + accounter := flow.NewAccounter(cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now, m, s, cfg.EnableUDNMapping) limiter := flow.NewCapacityLimiter(m) return &Flows{ diff --git a/pkg/exporter/converters_test.go b/pkg/exporter/converters_test.go index ee997981df38befa5d122a482cd30d7435d35402..cc72779d8a33039b5d3f97388c21f92d843e2ba3 100644 --- a/pkg/exporter/converters_test.go +++ b/pkg/exporter/converters_test.go @@ -55,7 +55,7 @@ func TestConversions(t *testing.T) { }, }, }, - Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil, nil)}, + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)}, TimeFlowStart: someTime, TimeFlowEnd: someTime, AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d), @@ -103,7 +103,7 @@ func TestConversions(t *testing.T) { Sampling: 2, }, }, - Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil, nil)}, + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)}, TimeFlowStart: someTime, TimeFlowEnd: someTime, AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d), @@ -149,7 +149,7 @@ func TestConversions(t *testing.T) { Dscp: 64, }, }, - Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil, nil)}, + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)}, TimeFlowStart: someTime, TimeFlowEnd: someTime, AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d), @@ -194,7 +194,7 @@ func TestConversions(t *testing.T) { Dscp: 64, }, }, - Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil, nil)}, + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)}, TimeFlowStart: someTime, TimeFlowEnd: someTime, AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d), @@ -240,7 +240,7 @@ func TestConversions(t *testing.T) { Packets: 128, }, }, - Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil, nil)}, + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)}, TimeFlowStart: someTime, TimeFlowEnd: someTime, AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d), @@ -287,7 +287,7 @@ func TestConversions(t *testing.T) { }, }, }, - Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil, nil)}, + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)}, TimeFlowStart: someTime, TimeFlowEnd: someTime, AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d), @@ -347,7 +347,7 @@ func TestConversions(t *testing.T) { }, }, }, - Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil, nil)}, + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)}, TimeFlowStart: someTime, TimeFlowEnd: someTime, AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d), @@ -412,8 +412,8 @@ func TestConversions(t *testing.T) { }, }, Interfaces: []model.IntfDirUdn{ - model.NewIntfDirUdn("5e6e92caa1d51cf", model.DirectionIngress, nil, nil), - model.NewIntfDirUdn("eth0", model.DirectionEgress, nil, nil), + model.NewIntfDirUdn("5e6e92caa1d51cf", model.DirectionIngress, nil), + model.NewIntfDirUdn("eth0", model.DirectionEgress, nil), }, TimeFlowStart: someTime, TimeFlowEnd: someTime, diff --git a/pkg/exporter/grpc_proto_test.go b/pkg/exporter/grpc_proto_test.go index 705cd36a611ddac8ecf14988c62b9e11725e6e78..7f4448dbf5b2d21e2c7b7a054d39d67fbc7a8c6f 100644 --- a/pkg/exporter/grpc_proto_test.go +++ b/pkg/exporter/grpc_proto_test.go @@ -123,7 +123,7 @@ func TestGRPCProto_SplitLargeMessages(t *testing.T) { for i := 0; i < 25000; i++ { input = append(input, &model.Record{Metrics: model.BpfFlowContent{BpfFlowMetrics: &ebpf.BpfFlowMetrics{ EthProtocol: model.IPv6Type, - }}, AgentIP: net.ParseIP("1111::1111"), Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("12345678", 0, nil, nil)}}) + }}, AgentIP: net.ParseIP("1111::1111"), Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("12345678", 0, nil)}}) } flows <- input go exporter.ExportFlows(flows) diff --git a/pkg/exporter/kafka_proto_test.go b/pkg/exporter/kafka_proto_test.go index 6a6b98adcebc08c5a921abc2bfb0e118c75a1e6a..3e3f2f790a8530c27f71d1ded5c4d5360ab65a48 100644 --- a/pkg/exporter/kafka_proto_test.go +++ b/pkg/exporter/kafka_proto_test.go @@ -51,7 +51,7 @@ func TestProtoConversion(t *testing.T) { Flags: uint16(1), }, }, - Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("veth0", 0, nil, nil), model.NewIntfDirUdn("abcde", 1, nil, nil)}, + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("veth0", 0, nil), model.NewIntfDirUdn("abcde", 1, nil)}, } input <- []*model.Record{&record} @@ -108,7 +108,7 @@ func TestIdenticalKeys(t *testing.T) { Flags: uint16(1), }, }, - Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("veth0", 0, nil, nil), model.NewIntfDirUdn("abcde", 1, nil, nil)}, + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("veth0", 0, nil), model.NewIntfDirUdn("abcde", 1, nil)}, } key1 := getFlowKey(&record) diff --git a/pkg/flow/account.go b/pkg/flow/account.go index 6d87806bbdfdb6a11ac28394397e201e6ed48d0a..c906e180a6ca0e157c9b6183da11718853f637c4 100644 --- a/pkg/flow/account.go +++ b/pkg/flow/account.go @@ -1,6 +1,7 @@ package flow import ( + "maps" "time" "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf" @@ -23,6 +24,7 @@ type Accounter struct { monoClock func() time.Duration metrics *metrics.Metrics s *ovnobserv.SampleDecoder + udnEnabled bool } var alog = logrus.WithField("component", "flow/Accounter") @@ -35,6 +37,7 @@ func NewAccounter( monoClock func() time.Duration, m *metrics.Metrics, s *ovnobserv.SampleDecoder, + udnEnabled bool, ) *Accounter { acc := Accounter{ maxEntries: maxEntries, @@ -44,6 +47,7 @@ func NewAccounter( monoClock: monoClock, metrics: m, s: s, + udnEnabled: udnEnabled, } return &acc } @@ -99,9 +103,19 @@ func (c *Accounter) evict(entries map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics, evict now := c.clock() monotonicNow := uint64(c.monoClock()) records := make([]*model.Record, 0, len(entries)) + udnCache := make(map[string]string) + if c.s != nil && c.udnEnabled { + udnsMap, err := c.s.GetInterfaceUDNs() + if err != nil { + alog.Errorf("failed to get udns to interfaces map : %v", err) + } else { + maps.Copy(udnCache, udnsMap) + alog.Tracef("GetInterfaceUDNS map: %v", udnCache) + } + } for key, metrics := range entries { flowContent := model.NewBpfFlowContent(*metrics) - records = append(records, model.NewRecord(key, &flowContent, now, monotonicNow, c.s)) + records = append(records, model.NewRecord(key, &flowContent, now, monotonicNow, c.s, udnCache)) } c.metrics.EvictionCounter.WithSourceAndReason("accounter", reason).Inc() c.metrics.EvictedFlowsCounter.WithSourceAndReason("accounter", reason).Add(float64(len(records))) diff --git a/pkg/flow/account_test.go b/pkg/flow/account_test.go index 56c037566e9bf1c710427753974f9d26385601a0..74d30281578f1191de455dc61a18d1f12813407a 100644 --- a/pkg/flow/account_test.go +++ b/pkg/flow/account_test.go @@ -51,7 +51,7 @@ func TestEvict_MaxEntries(t *testing.T) { return now }, func() time.Duration { return 1000 - }, metrics.NewMetrics(&metrics.Settings{}), nil) + }, metrics.NewMetrics(&metrics.Settings{}), nil, false) // WHEN it starts accounting new records inputs := make(chan *model.RawRecord, 20) @@ -111,7 +111,7 @@ func TestEvict_MaxEntries(t *testing.T) { }, TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond), TimeFlowEnd: now.Add(-(1000 - 789) * time.Nanosecond), - Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("[namer unset] 0", 0, nil, nil)}, + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("[namer unset] 0", 0, nil)}, }, k2: { ID: k2, @@ -122,7 +122,7 @@ func TestEvict_MaxEntries(t *testing.T) { }, TimeFlowStart: now.Add(-(1000 - 456) * time.Nanosecond), TimeFlowEnd: now.Add(-(1000 - 456) * time.Nanosecond), - Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("[namer unset] 0", 0, nil, nil)}, + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("[namer unset] 0", 0, nil)}, }, }, received) } @@ -134,7 +134,7 @@ func TestEvict_Period(t *testing.T) { return now }, func() time.Duration { return 1000 - }, metrics.NewMetrics(&metrics.Settings{}), nil) + }, metrics.NewMetrics(&metrics.Settings{}), nil, false) // WHEN it starts accounting new records inputs := make(chan *model.RawRecord, 20) @@ -191,7 +191,7 @@ func TestEvict_Period(t *testing.T) { }, TimeFlowStart: now.Add(-1000 + 123), TimeFlowEnd: now.Add(-1000 + 789), - Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("[namer unset] 0", 0, nil, nil)}, + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("[namer unset] 0", 0, nil)}, }, *records[0]) records = receiveTimeout(t, evictor) require.Len(t, records, 1) @@ -208,7 +208,7 @@ func TestEvict_Period(t *testing.T) { }, TimeFlowStart: now.Add(-1000 + 1123), TimeFlowEnd: now.Add(-1000 + 1456), - Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("[namer unset] 0", 0, nil, nil)}, + Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("[namer unset] 0", 0, nil)}, }, *records[0]) // no more flows are evicted diff --git a/pkg/flow/limiter_test.go b/pkg/flow/limiter_test.go index 587e232e0b40b0c66242bdf45c1c90e7174ba89e..2f4a419d71470b32abe669a843913ffb1e23f1af 100644 --- a/pkg/flow/limiter_test.go +++ b/pkg/flow/limiter_test.go @@ -19,7 +19,7 @@ func TestCapacityLimiter_NoDrop(t *testing.T) { // WHEN it buffers less elements than it's maximum capacity for i := 0; i < 33; i++ { - pipeIn <- []*model.Record{{Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn(strconv.Itoa(i), 0, nil, nil)}}} + pipeIn <- []*model.Record{{Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn(strconv.Itoa(i), 0, nil)}}} } // THEN it is able to retrieve all the buffered elements @@ -45,7 +45,7 @@ func TestCapacityLimiter_Drop(t *testing.T) { // WHEN it receives more elements than its maximum capacity // (it's not blocking) for i := 0; i < limiterLen*2; i++ { - pipeIn <- []*model.Record{{Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn(strconv.Itoa(i), 0, nil, nil)}}} + pipeIn <- []*model.Record{{Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn(strconv.Itoa(i), 0, nil)}}} } // THEN it is only able to retrieve all the nth first buffered elements diff --git a/pkg/flow/tracer_map.go b/pkg/flow/tracer_map.go index 7fa12bd07573b27afcfeb1d923cebc1357174345..6a77cfc9bf4250b329eb689792ba8b44129c82fe 100644 --- a/pkg/flow/tracer_map.go +++ b/pkg/flow/tracer_map.go @@ -2,6 +2,7 @@ package flow import ( "context" + "maps" "runtime" "sync" "time" @@ -30,6 +31,7 @@ type MapTracer struct { metrics *metrics.Metrics timeSpentinLookupAndDelete prometheus.Histogram s *ovnobserv.SampleDecoder + udnEnabled bool } type mapFetcher interface { @@ -38,7 +40,7 @@ type mapFetcher interface { } func NewMapTracer(fetcher mapFetcher, evictionTimeout, staleEntriesEvictTimeout time.Duration, m *metrics.Metrics, - s *ovnobserv.SampleDecoder) *MapTracer { + s *ovnobserv.SampleDecoder, udnEnabled bool) *MapTracer { return &MapTracer{ mapFetcher: fetcher, evictionTimeout: evictionTimeout, @@ -47,6 +49,7 @@ func NewMapTracer(fetcher mapFetcher, evictionTimeout, staleEntriesEvictTimeout metrics: m, timeSpentinLookupAndDelete: m.CreateTimeSpendInLookupAndDelete(), s: s, + udnEnabled: udnEnabled, } } @@ -105,6 +108,16 @@ func (m *MapTracer) evictFlows(ctx context.Context, forceGC bool, forwardFlows c var forwardingFlows []*model.Record flows := m.mapFetcher.LookupAndDeleteMap(m.metrics) elapsed := time.Since(currentTime) + udnCache := make(map[string]string) + if m.s != nil && m.udnEnabled { + udnsMap, err := m.s.GetInterfaceUDNs() + if err != nil { + mtlog.Errorf("failed to get udns to interfaces map : %v", err) + } else { + maps.Copy(udnCache, udnsMap) + mtlog.Tracef("GetInterfaceUDNS map: %v", udnCache) + } + } for flowKey, flowMetrics := range flows { forwardingFlows = append(forwardingFlows, model.NewRecord( flowKey, @@ -112,6 +125,7 @@ func (m *MapTracer) evictFlows(ctx context.Context, forceGC bool, forwardFlows c currentTime, uint64(monotonicTimeNow), m.s, + udnCache, )) } m.mapFetcher.DeleteMapsStaleEntries(m.staleEntriesEvictTimeout) diff --git a/pkg/model/record.go b/pkg/model/record.go index 35a6c21233ff13f8a35cea93e9f175a2e5d28bb0..cf63f404a99fecc9bef9b972626eac65c6f73f88 100644 --- a/pkg/model/record.go +++ b/pkg/model/record.go @@ -4,7 +4,6 @@ import ( "encoding/binary" "fmt" "io" - "maps" "net" "reflect" "time" @@ -70,7 +69,6 @@ type Record struct { // Calculated RTT which is set when record is created by calling NewRecord TimeFlowRtt time.Duration NetworkMonitorEventsMD []map[string]string - UdnsCache map[string]string } func NewRecord( @@ -79,6 +77,7 @@ func NewRecord( currentTime time.Time, monotonicCurrentTime uint64, s *ovnobserv.SampleDecoder, + udnsCache map[string]string, ) *Record { startDelta := time.Duration(monotonicCurrentTime - metrics.StartMonoTimeTs) endDelta := time.Duration(monotonicCurrentTime - metrics.EndMonoTimeTs) @@ -90,18 +89,15 @@ func NewRecord( TimeFlowEnd: currentTime.Add(-endDelta), AgentIP: agentIP, } - if s != nil { - record.UdnsCache = make(map[string]string) - } record.Interfaces = []IntfDirUdn{NewIntfDirUdn(interfaceNamer(int(metrics.IfIndexFirstSeen)), int(metrics.DirectionFirstSeen), - s, record.UdnsCache)} + udnsCache)} for i := uint8(0); i < record.Metrics.NbObservedIntf; i++ { record.Interfaces = append(record.Interfaces, NewIntfDirUdn( interfaceNamer(int(metrics.ObservedIntf[i])), int(metrics.ObservedDirection[i]), - s, record.UdnsCache, + udnsCache, )) } @@ -153,21 +149,10 @@ type IntfDirUdn struct { Udn string } -func NewIntfDirUdn(intf string, dir int, s *ovnobserv.SampleDecoder, cache map[string]string) IntfDirUdn { +func NewIntfDirUdn(intf string, dir int, cache map[string]string) IntfDirUdn { udn := "" - if s == nil { - return IntfDirUdn{Interface: intf, Direction: dir, Udn: udn} - } - - // Load UDN cache if empty if len(cache) == 0 { - m, err := s.GetInterfaceUDNs() - if err != nil { - recordLog.Errorf("failed to get udns to interfaces map : %v", err) - return IntfDirUdn{Interface: intf, Direction: dir, Udn: udn} - } - maps.Copy(cache, m) - recordLog.Tracef("GetInterfaceUDNS map: %v", cache) + return IntfDirUdn{Interface: intf, Direction: dir, Udn: udn} } // Look up the interface in the cache diff --git a/pkg/model/record_test.go b/pkg/model/record_test.go index 3aa7e70cc7704112df7e95671fb4f3571c22eeb2..a658787f0f2c081723631ceb94f44823808f69be 100644 --- a/pkg/model/record_test.go +++ b/pkg/model/record_test.go @@ -96,7 +96,7 @@ func TestParallelNewRecord(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - r := NewRecord(ebpf.BpfFlowId{}, &BpfFlowContent{BpfFlowMetrics: &ebpf.BpfFlowMetrics{}}, time.Now(), uint64(monotime.Now()), nil) + r := NewRecord(ebpf.BpfFlowId{}, &BpfFlowContent{BpfFlowMetrics: &ebpf.BpfFlowMetrics{}}, time.Now(), uint64(monotime.Now()), nil, map[string]string{}) assert.NotNil(t, r) }() }