Skip to content
Snippets Groups Projects
Unverified Commit 03bb6a38 authored by Mohamed S. Mahmoud's avatar Mohamed S. Mahmoud Committed by GitHub
Browse files

Interface name wasn't populated yet so add func to find ifname (#233)

parent 6f1d70a9
No related branches found
No related tags found
No related merge requests found
...@@ -66,11 +66,17 @@ func PBFlowToMap(flow *pbflow.Record) config.GenericMap { ...@@ -66,11 +66,17 @@ func PBFlowToMap(flow *pbflow.Record) config.GenericMap {
out["Packets"] = flow.Packets out["Packets"] = flow.Packets
} }
var interfaces []interface{} var interfaces []interface{}
var directions []interface{} var flowDirections []interface{}
for _, entry := range flow.GetDupList() {
out["Interfaces"] = append([]interface{}{entry.Interface}, interfaces...) if len(flow.GetDupList()) != 0 {
out["FlowDirections"] = append([]interface{}{int(entry.Direction.Number())}, directions...) for _, entry := range flow.GetDupList() {
interfaces = append(interfaces, entry.Interface)
flowDirections = append(flowDirections, entry.Direction)
}
out["Interfaces"] = interfaces
out["FlowDirections"] = flowDirections
} }
ethType := ethernet.EtherType(flow.EthProtocol) ethType := ethernet.EtherType(flow.EthProtocol)
if ethType == ethernet.EtherTypeIPv4 || ethType == ethernet.EtherTypeIPv6 { if ethType == ethernet.EtherTypeIPv4 || ethType == ethernet.EtherTypeIPv6 {
out["SrcAddr"] = ipToStr(flow.Network.GetSrcAddr()) out["SrcAddr"] = ipToStr(flow.Network.GetSrcAddr())
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
...@@ -70,7 +71,7 @@ func TestPBFlowToMap(t *testing.T) { ...@@ -70,7 +71,7 @@ func TestPBFlowToMap(t *testing.T) {
delete(out, "TimeReceived") delete(out, "TimeReceived")
assert.Equal(t, config.GenericMap{ assert.Equal(t, config.GenericMap{
"FlowDirection": 1, "FlowDirection": 1,
"FlowDirections": []interface{}{1}, "FlowDirections": []interface{}{pbflow.Direction(1)},
"Bytes": uint64(456), "Bytes": uint64(456),
"SrcAddr": "1.2.3.4", "SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8", "DstAddr": "5.6.7.8",
......
...@@ -85,12 +85,16 @@ func v4FlowToPB(fr *flow.Record) *pbflow.Record { ...@@ -85,12 +85,16 @@ func v4FlowToPB(fr *flow.Record) *pbflow.Record {
if fr.Metrics.DnsRecord.Latency != 0 { if fr.Metrics.DnsRecord.Latency != 0 {
pbflowRecord.DnsLatency = durationpb.New(fr.DNSLatency) pbflowRecord.DnsLatency = durationpb.New(fr.DNSLatency)
} }
pbflowRecord.DupList = make([]*pbflow.DupMapEntry, 0) if len(fr.DupList) != 0 {
for _, m := range fr.DupList { pbflowRecord.DupList = make([]*pbflow.DupMapEntry, 0)
pbflowRecord.DupList = append(pbflowRecord.DupList, &pbflow.DupMapEntry{ for _, m := range fr.DupList {
Interface: fr.Interface, for key, value := range m {
Direction: pbflow.Direction(m[fr.Interface]), pbflowRecord.DupList = append(pbflowRecord.DupList, &pbflow.DupMapEntry{
}) Interface: key,
Direction: pbflow.Direction(value),
})
}
}
} }
return &pbflowRecord return &pbflowRecord
} }
...@@ -142,12 +146,16 @@ func v6FlowToPB(fr *flow.Record) *pbflow.Record { ...@@ -142,12 +146,16 @@ func v6FlowToPB(fr *flow.Record) *pbflow.Record {
if fr.Metrics.DnsRecord.Latency != 0 { if fr.Metrics.DnsRecord.Latency != 0 {
pbflowRecord.DnsLatency = durationpb.New(fr.DNSLatency) pbflowRecord.DnsLatency = durationpb.New(fr.DNSLatency)
} }
pbflowRecord.DupList = make([]*pbflow.DupMapEntry, 0) if len(fr.DupList) != 0 {
for _, m := range fr.DupList { pbflowRecord.DupList = make([]*pbflow.DupMapEntry, 0)
pbflowRecord.DupList = append(pbflowRecord.DupList, &pbflow.DupMapEntry{ for _, m := range fr.DupList {
Interface: fr.Interface, for key, value := range m {
Direction: pbflow.Direction(m[fr.Interface]), pbflowRecord.DupList = append(pbflowRecord.DupList, &pbflow.DupMapEntry{
}) Interface: key,
Direction: pbflow.Direction(value),
})
}
}
} }
return &pbflowRecord return &pbflowRecord
} }
......
...@@ -2,11 +2,13 @@ package flow ...@@ -2,11 +2,13 @@ package flow
import ( import (
"container/list" "container/list"
"reflect"
"time" "time"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf" "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
) )
var dlog = logrus.WithField("component", "flow/Deduper") var dlog = logrus.WithField("component", "flow/Deduper")
...@@ -93,8 +95,17 @@ func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Rec ...@@ -93,8 +95,17 @@ func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Rec
*fwd = append(*fwd, r) *fwd = append(*fwd, r)
} }
if mergeDup { if mergeDup {
mergeEntry[r.Interface] = r.Id.Direction ifName := utils.GetInterfaceName(r.Id.IfIndex)
*fEntry.dupList = append(*fEntry.dupList, mergeEntry) mergeEntry[ifName] = r.Id.Direction
if dupEntryNew(*fEntry.dupList, mergeEntry) {
*fEntry.dupList = append(*fEntry.dupList, mergeEntry)
dlog.Debugf("merge list entries dump:")
for _, entry := range *fEntry.dupList {
for k, v := range entry {
dlog.Debugf("interface %s dir %d", k, v)
}
}
}
} }
return return
} }
...@@ -111,7 +122,8 @@ func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Rec ...@@ -111,7 +122,8 @@ func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Rec
expiryTime: timeNow().Add(c.expire), expiryTime: timeNow().Add(c.expire),
} }
if mergeDup { if mergeDup {
mergeEntry[r.Interface] = r.Id.Direction ifName := utils.GetInterfaceName(r.Id.IfIndex)
mergeEntry[ifName] = r.Id.Direction
r.DupList = append(r.DupList, mergeEntry) r.DupList = append(r.DupList, mergeEntry)
e.dupList = &r.DupList e.dupList = &r.DupList
} }
...@@ -119,6 +131,15 @@ func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Rec ...@@ -119,6 +131,15 @@ func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Rec
*fwd = append(*fwd, r) *fwd = append(*fwd, r)
} }
func dupEntryNew(dupList []map[string]uint8, mergeEntry map[string]uint8) bool {
for _, entry := range dupList {
if reflect.DeepEqual(entry, mergeEntry) {
return false
}
}
return true
}
func (c *deduperCache) removeExpired() { func (c *deduperCache) removeExpired() {
now := timeNow() now := timeNow()
ele := c.entries.Back() ele := c.entries.Back()
...@@ -126,7 +147,9 @@ func (c *deduperCache) removeExpired() { ...@@ -126,7 +147,9 @@ func (c *deduperCache) removeExpired() {
for ele != nil && now.After(ele.Value.(*entry).expiryTime) { for ele != nil && now.After(ele.Value.(*entry).expiryTime) {
evicted++ evicted++
c.entries.Remove(ele) c.entries.Remove(ele)
delete(c.ifaces, *ele.Value.(*entry).key) fEntry := ele.Value.(*entry)
fEntry.dupList = nil
delete(c.ifaces, *fEntry.key)
ele = c.entries.Back() ele = c.entries.Back()
} }
if evicted > 0 { if evicted > 0 {
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf" "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
) )
var ( var (
...@@ -151,6 +152,19 @@ func TestDedupeMerge(t *testing.T) { ...@@ -151,6 +152,19 @@ func TestDedupeMerge(t *testing.T) {
deduped := receiveTimeout(t, output) deduped := receiveTimeout(t, output)
assert.Equal(t, []*Record{oneIf2}, deduped) assert.Equal(t, []*Record{oneIf2}, deduped)
assert.Equal(t, 2, len(oneIf2.DupList)) assert.Equal(t, 2, len(oneIf2.DupList))
expectedMap := []map[string]uint8{
{
utils.GetInterfaceName(oneIf2.Id.IfIndex): oneIf2.Id.Direction,
},
{
utils.GetInterfaceName(oneIf1.Id.IfIndex): oneIf1.Id.Direction,
},
}
for k, v := range oneIf2.DupList {
assert.Equal(t, expectedMap[k], v)
}
} }
type timerMock struct { type timerMock struct {
......
...@@ -91,3 +91,11 @@ func utsnameStr[T int8 | uint8](in []T) string { ...@@ -91,3 +91,11 @@ func utsnameStr[T int8 | uint8](in []T) string {
} }
return string(out) return string(out)
} }
func GetInterfaceName(ifIndex uint32) string {
iface, err := net.InterfaceByIndex(int(ifIndex))
if err != nil {
return ""
}
return iface.Name
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment