Skip to content
Snippets Groups Projects
Unverified Commit 0c43b4a2 authored by Mohamed S. Mahmoud's avatar Mohamed S. Mahmoud Committed by GitHub
Browse files

NETOBSERV-1298: include duplicate list in the exported record (#217)

parent 06f4492e
No related branches found
No related tags found
No related merge requests found
...@@ -422,7 +422,7 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*fl ...@@ -422,7 +422,7 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*fl
rbTracer.SendsTo(accounter) rbTracer.SendsTo(accounter)
if f.cfg.Deduper == DeduperFirstCome { if f.cfg.Deduper == DeduperFirstCome {
deduper := node.AsMiddle(flow.Dedupe(f.cfg.DeduperFCExpiry, f.cfg.DeduperJustMark), deduper := node.AsMiddle(flow.Dedupe(f.cfg.DeduperFCExpiry, f.cfg.DeduperJustMark, f.cfg.DeduperMerge),
node.ChannelBufferLen(f.cfg.BuffersLength)) node.ChannelBufferLen(f.cfg.BuffersLength))
mapTracer.SendsTo(deduper) mapTracer.SendsTo(deduper)
accounter.SendsTo(deduper) accounter.SendsTo(deduper)
......
...@@ -86,6 +86,8 @@ type Config struct { ...@@ -86,6 +86,8 @@ type Config struct {
DeduperFCExpiry time.Duration `env:"DEDUPER_FC_EXPIRY"` DeduperFCExpiry time.Duration `env:"DEDUPER_FC_EXPIRY"`
// DeduperJustMark will just mark duplicates (boolean field) instead of dropping them. // DeduperJustMark will just mark duplicates (boolean field) instead of dropping them.
DeduperJustMark bool `env:"DEDUPER_JUST_MARK"` DeduperJustMark bool `env:"DEDUPER_JUST_MARK"`
// DeduperMerge will merge duplicated flows and generate list of interfaces and direction pairs
DeduperMerge bool `env:"DEDUPER_MERGE" envDefault:"false"`
// Direction allows selecting which flows to trace according to its direction. Accepted values // Direction allows selecting which flows to trace according to its direction. Accepted values
// are "ingress", "egress" or "both" (default). // are "ingress", "egress" or "both" (default).
Direction string `env:"DIRECTION" envDefault:"both"` Direction string `env:"DIRECTION" envDefault:"both"`
......
...@@ -65,7 +65,12 @@ func PBFlowToMap(flow *pbflow.Record) config.GenericMap { ...@@ -65,7 +65,12 @@ func PBFlowToMap(flow *pbflow.Record) config.GenericMap {
if flow.Packets != 0 { if flow.Packets != 0 {
out["Packets"] = flow.Packets out["Packets"] = flow.Packets
} }
var interfaces []interface{}
var directions []interface{}
for _, entry := range flow.GetDupList() {
out["Interfaces"] = append([]interface{}{entry.Interface}, interfaces...)
out["FlowDirections"] = append([]interface{}{int(entry.Direction.Number())}, directions...)
}
ethType := ethernet.EtherType(flow.EthProtocol) ethType := ethernet.EtherType(flow.EthProtocol)
if ethType == ethernet.EtherTypeIPv4 || ethType == ethernet.EtherTypeIPv6 { if ethType == ethernet.EtherTypeIPv4 || ethType == ethernet.EtherTypeIPv6 {
out["SrcAddr"] = ipToStr(flow.Network.GetSrcAddr()) out["SrcAddr"] = ipToStr(flow.Network.GetSrcAddr())
......
...@@ -17,7 +17,13 @@ func TestPBFlowToMap(t *testing.T) { ...@@ -17,7 +17,13 @@ func TestPBFlowToMap(t *testing.T) {
someTime := time.Now() someTime := time.Now()
var someDuration time.Duration = 10000000 // 10ms var someDuration time.Duration = 10000000 // 10ms
flow := &pbflow.Record{ flow := &pbflow.Record{
Interface: "eth0", Interface: "eth0",
DupList: []*pbflow.DupMapEntry{
{
Interface: "eth0",
Direction: pbflow.Direction_EGRESS,
},
},
EthProtocol: 2048, EthProtocol: 2048,
Bytes: 456, Bytes: 456,
Packets: 123, Packets: 123,
...@@ -64,6 +70,7 @@ func TestPBFlowToMap(t *testing.T) { ...@@ -64,6 +70,7 @@ func TestPBFlowToMap(t *testing.T) {
delete(out, "TimeReceived") delete(out, "TimeReceived")
assert.Equal(t, config.GenericMap{ assert.Equal(t, config.GenericMap{
"FlowDirection": 1, "FlowDirection": 1,
"FlowDirections": []interface{}{1},
"Bytes": uint64(456), "Bytes": uint64(456),
"SrcAddr": "1.2.3.4", "SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8", "DstAddr": "5.6.7.8",
...@@ -78,6 +85,7 @@ func TestPBFlowToMap(t *testing.T) { ...@@ -78,6 +85,7 @@ func TestPBFlowToMap(t *testing.T) {
"Proto": uint32(6), "Proto": uint32(6),
"TimeFlowStartMs": someTime.UnixMilli(), "TimeFlowStartMs": someTime.UnixMilli(),
"TimeFlowEndMs": someTime.UnixMilli(), "TimeFlowEndMs": someTime.UnixMilli(),
"Interfaces": []interface{}{"eth0"},
"Interface": "eth0", "Interface": "eth0",
"AgentIP": "10.9.8.7", "AgentIP": "10.9.8.7",
"Flags": uint32(0x100), "Flags": uint32(0x100),
......
...@@ -57,7 +57,10 @@ func TestProtoConversion(t *testing.T) { ...@@ -57,7 +57,10 @@ func TestProtoConversion(t *testing.T) {
var r pbflow.Record var r pbflow.Record
require.NoError(t, proto.Unmarshal(wc.messages[0].Value, &r)) require.NoError(t, proto.Unmarshal(wc.messages[0].Value, &r))
assert.EqualValues(t, 3, r.EthProtocol) assert.EqualValues(t, 3, r.EthProtocol)
assert.EqualValues(t, 1, r.Direction) for _, e := range r.DupList {
assert.EqualValues(t, 1, e.Direction)
assert.Equal(t, "veth0", e.Interface)
}
assert.EqualValues(t, uint64(0xaabbccddeeff), r.DataLink.SrcMac) assert.EqualValues(t, uint64(0xaabbccddeeff), r.DataLink.SrcMac)
assert.EqualValues(t, uint64(0x112233445566), r.DataLink.DstMac) assert.EqualValues(t, uint64(0x112233445566), r.DataLink.DstMac)
assert.EqualValues(t, uint64(0xC0010203) /* 192.1.2.3 */, r.Network.SrcAddr.GetIpv4()) assert.EqualValues(t, uint64(0xC0010203) /* 192.1.2.3 */, r.Network.SrcAddr.GetIpv4())
...@@ -71,7 +74,6 @@ func TestProtoConversion(t *testing.T) { ...@@ -71,7 +74,6 @@ func TestProtoConversion(t *testing.T) {
assert.EqualValues(t, 789, r.Bytes) assert.EqualValues(t, 789, r.Bytes)
assert.EqualValues(t, 987, r.Packets) assert.EqualValues(t, 987, r.Packets)
assert.EqualValues(t, uint16(1), r.Flags) assert.EqualValues(t, uint16(1), r.Flags)
assert.Equal(t, "veth0", r.Interface)
assert.Equal(t, ByteArrayFromNetIP(net.ParseIP("127.3.2.1")), wc.messages[0].Key[0:16]) assert.Equal(t, ByteArrayFromNetIP(net.ParseIP("127.3.2.1")), wc.messages[0].Key[0:16])
assert.Equal(t, ByteArrayFromNetIP(net.ParseIP("192.1.2.3")), wc.messages[0].Key[16:]) assert.Equal(t, ByteArrayFromNetIP(net.ParseIP("192.1.2.3")), wc.messages[0].Key[16:])
} }
......
...@@ -71,7 +71,7 @@ func v4FlowToPB(fr *flow.Record) *pbflow.Record { ...@@ -71,7 +71,7 @@ func v4FlowToPB(fr *flow.Record) *pbflow.Record {
Duplicate: fr.Duplicate, Duplicate: fr.Duplicate,
AgentIp: agentIP(fr.AgentIP), AgentIp: agentIP(fr.AgentIP),
Flags: uint32(fr.Metrics.Flags), Flags: uint32(fr.Metrics.Flags),
Interface: string(fr.Interface), Interface: fr.Interface,
PktDropBytes: fr.Metrics.PktDrops.Bytes, PktDropBytes: fr.Metrics.PktDrops.Bytes,
PktDropPackets: uint64(fr.Metrics.PktDrops.Packets), PktDropPackets: uint64(fr.Metrics.PktDrops.Packets),
PktDropLatestFlags: uint32(fr.Metrics.PktDrops.LatestFlags), PktDropLatestFlags: uint32(fr.Metrics.PktDrops.LatestFlags),
...@@ -85,6 +85,13 @@ func v4FlowToPB(fr *flow.Record) *pbflow.Record { ...@@ -85,6 +85,13 @@ func v4FlowToPB(fr *flow.Record) *pbflow.Record {
if fr.Metrics.DnsRecord.Latency != 0 { if fr.Metrics.DnsRecord.Latency != 0 {
pbflowRecord.DnsLatency = durationpb.New(fr.DNSLatency) pbflowRecord.DnsLatency = durationpb.New(fr.DNSLatency)
} }
pbflowRecord.DupList = make([]*pbflow.DupMapEntry, 0)
for _, m := range fr.DupList {
pbflowRecord.DupList = append(pbflowRecord.DupList, &pbflow.DupMapEntry{
Interface: fr.Interface,
Direction: pbflow.Direction(m[fr.Interface]),
})
}
return &pbflowRecord return &pbflowRecord
} }
...@@ -135,6 +142,13 @@ func v6FlowToPB(fr *flow.Record) *pbflow.Record { ...@@ -135,6 +142,13 @@ func v6FlowToPB(fr *flow.Record) *pbflow.Record {
if fr.Metrics.DnsRecord.Latency != 0 { if fr.Metrics.DnsRecord.Latency != 0 {
pbflowRecord.DnsLatency = durationpb.New(fr.DNSLatency) pbflowRecord.DnsLatency = durationpb.New(fr.DNSLatency)
} }
pbflowRecord.DupList = make([]*pbflow.DupMapEntry, 0)
for _, m := range fr.DupList {
pbflowRecord.DupList = append(pbflowRecord.DupList, &pbflow.DupMapEntry{
Interface: fr.Interface,
Direction: pbflow.Direction(m[fr.Interface]),
})
}
return &pbflowRecord return &pbflowRecord
} }
......
...@@ -109,6 +109,7 @@ func TestEvict_MaxEntries(t *testing.T) { ...@@ -109,6 +109,7 @@ func TestEvict_MaxEntries(t *testing.T) {
}, },
TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond), TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond),
TimeFlowEnd: now.Add(-(1000 - 789) * time.Nanosecond), TimeFlowEnd: now.Add(-(1000 - 789) * time.Nanosecond),
DupList: make([]map[string]uint8, 0),
}, },
k2: { k2: {
RawRecord: RawRecord{ RawRecord: RawRecord{
...@@ -119,6 +120,7 @@ func TestEvict_MaxEntries(t *testing.T) { ...@@ -119,6 +120,7 @@ func TestEvict_MaxEntries(t *testing.T) {
}, },
TimeFlowStart: now.Add(-(1000 - 456) * time.Nanosecond), TimeFlowStart: now.Add(-(1000 - 456) * time.Nanosecond),
TimeFlowEnd: now.Add(-(1000 - 456) * time.Nanosecond), TimeFlowEnd: now.Add(-(1000 - 456) * time.Nanosecond),
DupList: make([]map[string]uint8, 0),
}, },
}, received) }, received)
} }
...@@ -187,6 +189,7 @@ func TestEvict_Period(t *testing.T) { ...@@ -187,6 +189,7 @@ func TestEvict_Period(t *testing.T) {
}, },
TimeFlowStart: now.Add(-1000 + 123), TimeFlowStart: now.Add(-1000 + 123),
TimeFlowEnd: now.Add(-1000 + 789), TimeFlowEnd: now.Add(-1000 + 789),
DupList: make([]map[string]uint8, 0),
}, *records[0]) }, *records[0])
records = receiveTimeout(t, evictor) records = receiveTimeout(t, evictor)
require.Len(t, records, 1) require.Len(t, records, 1)
...@@ -203,6 +206,7 @@ func TestEvict_Period(t *testing.T) { ...@@ -203,6 +206,7 @@ func TestEvict_Period(t *testing.T) {
}, },
TimeFlowStart: now.Add(-1000 + 1123), TimeFlowStart: now.Add(-1000 + 1123),
TimeFlowEnd: now.Add(-1000 + 1456), TimeFlowEnd: now.Add(-1000 + 1456),
DupList: make([]map[string]uint8, 0),
}, *records[0]) }, *records[0])
// no more flows are evicted // no more flows are evicted
......
...@@ -29,6 +29,7 @@ type entry struct { ...@@ -29,6 +29,7 @@ type entry struct {
dnsRecord *ebpf.BpfDnsRecordT dnsRecord *ebpf.BpfDnsRecordT
ifIndex uint32 ifIndex uint32
expiryTime time.Time expiryTime time.Time
dupList *[]map[string]uint8
} }
// Dedupe receives flows and filters these belonging to duplicate interfaces. It will forward // Dedupe receives flows and filters these belonging to duplicate interfaces. It will forward
...@@ -36,7 +37,7 @@ type entry struct { ...@@ -36,7 +37,7 @@ type entry struct {
// (no activity for it during the expiration time) // (no activity for it during the expiration time)
// The justMark argument tells that the deduper should not drop the duplicate flows but // The justMark argument tells that the deduper should not drop the duplicate flows but
// set their Duplicate field. // set their Duplicate field.
func Dedupe(expireTime time.Duration, justMark bool) func(in <-chan []*Record, out chan<- []*Record) { func Dedupe(expireTime time.Duration, justMark, mergeDup bool) func(in <-chan []*Record, out chan<- []*Record) {
cache := &deduperCache{ cache := &deduperCache{
expire: expireTime, expire: expireTime,
entries: list.New(), entries: list.New(),
...@@ -47,7 +48,7 @@ func Dedupe(expireTime time.Duration, justMark bool) func(in <-chan []*Record, o ...@@ -47,7 +48,7 @@ func Dedupe(expireTime time.Duration, justMark bool) func(in <-chan []*Record, o
cache.removeExpired() cache.removeExpired()
fwd := make([]*Record, 0, len(records)) fwd := make([]*Record, 0, len(records))
for _, record := range records { for _, record := range records {
cache.checkDupe(record, justMark, &fwd) cache.checkDupe(record, justMark, mergeDup, &fwd)
} }
if len(fwd) > 0 { if len(fwd) > 0 {
out <- fwd out <- fwd
...@@ -57,7 +58,8 @@ func Dedupe(expireTime time.Duration, justMark bool) func(in <-chan []*Record, o ...@@ -57,7 +58,8 @@ func Dedupe(expireTime time.Duration, justMark bool) func(in <-chan []*Record, o
} }
// checkDupe check current record if its already available nad if not added to fwd records list // checkDupe check current record if its already available nad if not added to fwd records list
func (c *deduperCache) checkDupe(r *Record, justMark bool, fwd *[]*Record) { func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Record) {
mergeEntry := make(map[string]uint8)
rk := r.Id rk := r.Id
// zeroes fields from key that should be ignored from the flow comparison // zeroes fields from key that should be ignored from the flow comparison
rk.IfIndex = 0 rk.IfIndex = 0
...@@ -86,6 +88,10 @@ func (c *deduperCache) checkDupe(r *Record, justMark bool, fwd *[]*Record) { ...@@ -86,6 +88,10 @@ func (c *deduperCache) checkDupe(r *Record, justMark bool, fwd *[]*Record) {
r.Duplicate = true r.Duplicate = true
*fwd = append(*fwd, r) *fwd = append(*fwd, r)
} }
if mergeDup {
mergeEntry[r.Interface] = r.Id.Direction
*fEntry.dupList = append(*fEntry.dupList, mergeEntry)
}
return return
} }
*fwd = append(*fwd, r) *fwd = append(*fwd, r)
...@@ -99,6 +105,11 @@ func (c *deduperCache) checkDupe(r *Record, justMark bool, fwd *[]*Record) { ...@@ -99,6 +105,11 @@ func (c *deduperCache) checkDupe(r *Record, justMark bool, fwd *[]*Record) {
ifIndex: r.Id.IfIndex, ifIndex: r.Id.IfIndex,
expiryTime: timeNow().Add(c.expire), expiryTime: timeNow().Add(c.expire),
} }
if mergeDup {
mergeEntry[r.Interface] = r.Id.Direction
r.DupList = append(r.DupList, mergeEntry)
e.dupList = &r.DupList
}
c.ifaces[rk] = c.entries.PushFront(&e) c.ifaces[rk] = c.entries.PushFront(&e)
*fwd = append(*fwd, r) *fwd = append(*fwd, r)
} }
......
...@@ -56,7 +56,7 @@ func TestDedupe(t *testing.T) { ...@@ -56,7 +56,7 @@ func TestDedupe(t *testing.T) {
input := make(chan []*Record, 100) input := make(chan []*Record, 100)
output := make(chan []*Record, 100) output := make(chan []*Record, 100)
go Dedupe(time.Minute, false)(input, output) go Dedupe(time.Minute, false, false)(input, output)
input <- []*Record{ input <- []*Record{
oneIf2, // record 1 at interface 2: should be accepted oneIf2, // record 1 at interface 2: should be accepted
...@@ -89,7 +89,7 @@ func TestDedupe_EvictFlows(t *testing.T) { ...@@ -89,7 +89,7 @@ func TestDedupe_EvictFlows(t *testing.T) {
input := make(chan []*Record, 100) input := make(chan []*Record, 100)
output := make(chan []*Record, 100) output := make(chan []*Record, 100)
go Dedupe(15*time.Second, false)(input, output) go Dedupe(15*time.Second, false, false)(input, output)
// Should only accept records 1 and 2, at interface 1 // Should only accept records 1 and 2, at interface 1
input <- []*Record{oneIf1, twoIf1, oneIf2} input <- []*Record{oneIf1, twoIf1, oneIf2}
...@@ -120,6 +120,21 @@ func TestDedupe_EvictFlows(t *testing.T) { ...@@ -120,6 +120,21 @@ func TestDedupe_EvictFlows(t *testing.T) {
receiveTimeout(t, output)) receiveTimeout(t, output))
} }
func TestDedupeMerge(t *testing.T) {
input := make(chan []*Record, 100)
output := make(chan []*Record, 100)
go Dedupe(time.Minute, false, true)(input, output)
input <- []*Record{
oneIf2, // record 1 at interface 2: should be accepted
oneIf1,
}
deduped := receiveTimeout(t, output)
assert.Equal(t, []*Record{oneIf2}, deduped)
assert.Equal(t, 2, len(oneIf2.DupList))
}
type timerMock struct { type timerMock struct {
now time.Time now time.Time
} }
......
...@@ -53,6 +53,7 @@ type Record struct { ...@@ -53,6 +53,7 @@ type Record struct {
AgentIP net.IP AgentIP net.IP
// Calculated RTT which is set when record is created by calling NewRecord // Calculated RTT which is set when record is created by calling NewRecord
TimeFlowRtt time.Duration TimeFlowRtt time.Duration
DupList []map[string]uint8
} }
func NewRecord( func NewRecord(
...@@ -78,6 +79,7 @@ func NewRecord( ...@@ -78,6 +79,7 @@ func NewRecord(
if metrics.DnsRecord.Latency != 0 { if metrics.DnsRecord.Latency != 0 {
record.DNSLatency = time.Duration(metrics.DnsRecord.Latency) record.DNSLatency = time.Duration(metrics.DnsRecord.Latency)
} }
record.DupList = make([]map[string]uint8, 0)
return &record return &record
} }
......
...@@ -142,8 +142,12 @@ func BenchmarkIPv4GRPCCommunication(b *testing.B) { ...@@ -142,8 +142,12 @@ func BenchmarkIPv4GRPCCommunication(b *testing.B) {
EthProtocol: 2048, EthProtocol: 2048,
Bytes: 456, Bytes: 456,
Flags: 1, Flags: 1,
DupList: []*pbflow.DupMapEntry{
Direction: pbflow.Direction_EGRESS, {
Interface: "eth0",
Direction: pbflow.Direction_EGRESS,
},
},
TimeFlowStart: timestamppb.Now(), TimeFlowStart: timestamppb.Now(),
TimeFlowEnd: timestamppb.Now(), TimeFlowEnd: timestamppb.Now(),
Network: &pbflow.Network{ Network: &pbflow.Network{
...@@ -196,10 +200,15 @@ func BenchmarkIPv6GRPCCommunication(b *testing.B) { ...@@ -196,10 +200,15 @@ func BenchmarkIPv6GRPCCommunication(b *testing.B) {
client := cc.Client() client := cc.Client()
f := &pbflow.Record{ f := &pbflow.Record{
EthProtocol: 2048, EthProtocol: 2048,
Bytes: 456, Bytes: 456,
Flags: 1, Flags: 1,
Direction: pbflow.Direction_EGRESS, DupList: []*pbflow.DupMapEntry{
{
Interface: "eth0",
Direction: pbflow.Direction_EGRESS,
},
},
TimeFlowStart: timestamppb.Now(), TimeFlowStart: timestamppb.Now(),
TimeFlowEnd: timestamppb.Now(), TimeFlowEnd: timestamppb.Now(),
Network: &pbflow.Network{ Network: &pbflow.Network{
......
This diff is collapsed.
...@@ -18,6 +18,10 @@ message Records { ...@@ -18,6 +18,10 @@ message Records {
repeated Record entries = 1; repeated Record entries = 1;
} }
message DupMapEntry {
string interface = 1;
Direction direction = 2;
}
message Record { message Record {
// protocol as defined by ETH_P_* in linux/if_ether.h // protocol as defined by ETH_P_* in linux/if_ether.h
// https://github.com/torvalds/linux/blob/master/include/uapi/linux/if_ether.h // https://github.com/torvalds/linux/blob/master/include/uapi/linux/if_ether.h
...@@ -34,7 +38,7 @@ message Record { ...@@ -34,7 +38,7 @@ message Record {
uint64 bytes = 8; uint64 bytes = 8;
uint64 packets = 9; uint64 packets = 9;
string interface = 10; string interface = 10;
// if true, the same flow has been recorded via another interface. // if true, the same flow has been recorded via another interface.
// From all the duplicate flows, one will set this value to false and the rest will be true. // From all the duplicate flows, one will set this value to false and the rest will be true.
bool duplicate = 11; bool duplicate = 11;
...@@ -54,6 +58,7 @@ message Record { ...@@ -54,6 +58,7 @@ message Record {
google.protobuf.Duration dns_latency = 23; google.protobuf.Duration dns_latency = 23;
google.protobuf.Duration time_flow_rtt = 24; google.protobuf.Duration time_flow_rtt = 24;
uint32 dns_errno = 25; uint32 dns_errno = 25;
repeated DupMapEntry dup_list = 26;
} }
message DataLink { message DataLink {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment