Skip to content
Snippets Groups Projects
Unverified Commit faf274e0 authored by Mohamed S. Mahmoud's avatar Mohamed S. Mahmoud Committed by GitHub
Browse files

Revert "change aggregation flow map to hashmap instead perCPU hashmap (#118)" (#172)


This reverts commit b6e2b873.

fix

Signed-off-by: default avatarmsherif1234 <mmahmoud@redhat.com>
parent a7d4eecc
No related branches found
No related tags found
No related merge requests found
...@@ -75,6 +75,11 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { ...@@ -75,6 +75,11 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
aggregate_flow->packets += 1; aggregate_flow->packets += 1;
aggregate_flow->bytes += skb->len; aggregate_flow->bytes += skb->len;
aggregate_flow->end_mono_time_ts = pkt.current_ts; aggregate_flow->end_mono_time_ts = pkt.current_ts;
// it might happen that start_mono_time hasn't been set due to
// the way percpu hashmap deal with concurrent map entries
if (aggregate_flow->start_mono_time_ts == 0) {
aggregate_flow->start_mono_time_ts = pkt.current_ts;
}
aggregate_flow->flags |= pkt.flags; aggregate_flow->flags |= pkt.flags;
// Does not matter the gate. Will be zero if not enabled. // Does not matter the gate. Will be zero if not enabled.
......
...@@ -11,7 +11,7 @@ struct { ...@@ -11,7 +11,7 @@ struct {
// Key: the flow identifier. Value: the flow metrics for that identifier. // Key: the flow identifier. Value: the flow metrics for that identifier.
struct { struct {
__uint(type, BPF_MAP_TYPE_HASH); __uint(type, BPF_MAP_TYPE_PERCPU_HASH);
__type(key, flow_id); __type(key, flow_id);
__type(value, flow_metrics); __type(value, flow_metrics);
__uint(max_entries, 1 << 24); __uint(max_entries, 1 << 24);
......
...@@ -253,12 +253,13 @@ static inline long pkt_drop_lookup_and_update_flow(struct sk_buff *skb, flow_id ...@@ -253,12 +253,13 @@ static inline long pkt_drop_lookup_and_update_flow(struct sk_buff *skb, flow_id
enum skb_drop_reason reason) { enum skb_drop_reason reason) {
flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, id); flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, id);
if (aggregate_flow != NULL) { if (aggregate_flow != NULL) {
aggregate_flow->end_mono_time_ts = bpf_ktime_get_ns();
aggregate_flow->pkt_drops.packets += 1; aggregate_flow->pkt_drops.packets += 1;
aggregate_flow->pkt_drops.bytes += skb->len; aggregate_flow->pkt_drops.bytes += skb->len;
aggregate_flow->pkt_drops.latest_state = state; aggregate_flow->pkt_drops.latest_state = state;
aggregate_flow->pkt_drops.latest_flags = flags; aggregate_flow->pkt_drops.latest_flags = flags;
aggregate_flow->pkt_drops.latest_drop_cause = reason; aggregate_flow->pkt_drops.latest_drop_cause = reason;
long ret = bpf_map_update_elem(&aggregated_flows, id, aggregate_flow, BPF_ANY); long ret = bpf_map_update_elem(&aggregated_flows, id, aggregate_flow, BPF_EXIST);
if (trace_messages && ret != 0) { if (trace_messages && ret != 0) {
bpf_printk("error packet drop updating flow %d\n", ret); bpf_printk("error packet drop updating flow %d\n", ret);
} }
......
...@@ -11,7 +11,7 @@ flowchart TD ...@@ -11,7 +11,7 @@ flowchart TD
E(ebpf.FlowFetcher) --> |"pushes via<br/>RingBuffer"| RB(flow.RingBufTracer) E(ebpf.FlowFetcher) --> |"pushes via<br/>RingBuffer"| RB(flow.RingBufTracer)
style E fill:#990 style E fill:#990
E --> |"polls<br/>HashMap"| M(flow.MapTracer) E --> |"polls<br/>PerCPUHashMap"| M(flow.MapTracer)
RB --> |chan *flow.Record| ACC(flow.Accounter) RB --> |chan *flow.Record| ACC(flow.Accounter)
RB -.-> |flushes| M RB -.-> |flushes| M
ACC --> |"chan []*flow.Record"| DD(flow.Deduper) ACC --> |"chan []*flow.Record"| DD(flow.Deduper)
......
...@@ -72,7 +72,7 @@ func main() { ...@@ -72,7 +72,7 @@ func main() {
for records := range receivedRecords { for records := range receivedRecords {
for _, record := range records.Entries { for _, record := range records.Entries {
if record.EthProtocol == ipv6 { if record.EthProtocol == ipv6 {
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v\n", log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v DropPkts: %d DropBytes: %d DropCause %d\n",
ipProto[record.EthProtocol], ipProto[record.EthProtocol],
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"), record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
record.Interface, record.Interface,
...@@ -92,9 +92,12 @@ func main() { ...@@ -92,9 +92,12 @@ func main() {
record.GetDnsFlags(), record.GetDnsFlags(),
record.DnsLatency.AsDuration().Milliseconds(), record.DnsLatency.AsDuration().Milliseconds(),
record.TimeFlowRtt.AsDuration().Nanoseconds(), record.TimeFlowRtt.AsDuration().Nanoseconds(),
record.GetPktDropPackets(),
record.GetPktDropBytes(),
record.GetPktDropLatestDropCause(),
) )
} else { } else {
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v\n", log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v DropPkts: %d DropBytes: %d DropCause %d\n",
ipProto[record.EthProtocol], ipProto[record.EthProtocol],
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"), record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
record.Interface, record.Interface,
...@@ -114,6 +117,9 @@ func main() { ...@@ -114,6 +117,9 @@ func main() {
record.GetDnsFlags(), record.GetDnsFlags(),
record.DnsLatency.AsDuration().Milliseconds(), record.DnsLatency.AsDuration().Milliseconds(),
record.TimeFlowRtt.AsDuration().Nanoseconds(), record.TimeFlowRtt.AsDuration().Nanoseconds(),
record.GetPktDropPackets(),
record.GetPktDropBytes(),
record.GetPktDropLatestDropCause(),
) )
} }
} }
......
...@@ -78,7 +78,7 @@ type ebpfFlowFetcher interface { ...@@ -78,7 +78,7 @@ type ebpfFlowFetcher interface {
io.Closer io.Closer
Register(iface ifaces.Interface) error Register(iface ifaces.Interface) error
LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics LookupAndDeleteMap() map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics
DeleteMapsStaleEntries(timeOut time.Duration) DeleteMapsStaleEntries(timeOut time.Duration)
ReadRingBuf() (ringbuf.Record, error) ReadRingBuf() (ringbuf.Record, error)
} }
......
...@@ -49,6 +49,11 @@ var ( ...@@ -49,6 +49,11 @@ var (
DstPort: 456, DstPort: 456,
IfIndex: 3, IfIndex: 3,
} }
key1Dupe = ebpf.BpfFlowId{
SrcPort: 123,
DstPort: 456,
IfIndex: 4,
}
key2 = ebpf.BpfFlowId{ key2 = ebpf.BpfFlowId{
SrcPort: 333, SrcPort: 333,
...@@ -66,7 +71,7 @@ func TestFlowsAgent_Deduplication(t *testing.T) { ...@@ -66,7 +71,7 @@ func TestFlowsAgent_Deduplication(t *testing.T) {
}) })
exported := export.Get(t, timeout) exported := export.Get(t, timeout)
assert.Len(t, exported, 1) assert.Len(t, exported, 2)
receivedKeys := map[ebpf.BpfFlowId]struct{}{} receivedKeys := map[ebpf.BpfFlowId]struct{}{}
...@@ -76,11 +81,21 @@ func TestFlowsAgent_Deduplication(t *testing.T) { ...@@ -76,11 +81,21 @@ func TestFlowsAgent_Deduplication(t *testing.T) {
receivedKeys[f.Id] = struct{}{} receivedKeys[f.Id] = struct{}{}
switch f.Id { switch f.Id {
case key1: case key1:
assert.EqualValues(t, 3, f.Metrics.Packets) assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 44, f.Metrics.Bytes) assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.False(t, f.Duplicate) assert.False(t, f.Duplicate)
assert.Equal(t, "foo", f.Interface) assert.Equal(t, "foo", f.Interface)
key1Flows = append(key1Flows, f) key1Flows = append(key1Flows, f)
case key1Dupe:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "bar", f.Interface)
key1Flows = append(key1Flows, f)
case key2:
assert.EqualValues(t, 7, f.Metrics.Packets)
assert.EqualValues(t, 33, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
} }
} }
assert.Lenf(t, key1Flows, 1, "only one flow should have been forwarded: %#v", key1Flows) assert.Lenf(t, key1Flows, 1, "only one flow should have been forwarded: %#v", key1Flows)
...@@ -97,22 +112,33 @@ func TestFlowsAgent_DeduplicationJustMark(t *testing.T) { ...@@ -97,22 +112,33 @@ func TestFlowsAgent_DeduplicationJustMark(t *testing.T) {
exported := export.Get(t, timeout) exported := export.Get(t, timeout)
receivedKeys := map[ebpf.BpfFlowId]struct{}{} receivedKeys := map[ebpf.BpfFlowId]struct{}{}
assert.Len(t, exported, 1) assert.Len(t, exported, 3)
duplicates := 0 duplicates := 0
for _, f := range exported { for _, f := range exported {
require.NotContains(t, receivedKeys, f.Id) require.NotContains(t, receivedKeys, f.Id)
receivedKeys[f.Id] = struct{}{} receivedKeys[f.Id] = struct{}{}
switch f.Id { switch f.Id {
case key1: case key1:
assert.EqualValues(t, 3, f.Metrics.Packets) assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 44, f.Metrics.Bytes) assert.EqualValues(t, 66, f.Metrics.Bytes)
if f.Duplicate { if f.Duplicate {
duplicates++ duplicates++
} }
assert.Equal(t, "foo", f.Interface) assert.Equal(t, "foo", f.Interface)
case key1Dupe:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
if f.Duplicate {
duplicates++
}
assert.Equal(t, "bar", f.Interface)
case key2:
assert.EqualValues(t, 7, f.Metrics.Packets)
assert.EqualValues(t, 33, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
} }
} }
assert.Equalf(t, 0, duplicates, "exported flows should have only one duplicate: %#v", exported) assert.Equalf(t, 1, duplicates, "exported flows should have only one duplicate: %#v", exported)
} }
func TestFlowsAgent_Deduplication_None(t *testing.T) { func TestFlowsAgent_Deduplication_None(t *testing.T) {
...@@ -123,7 +149,7 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) { ...@@ -123,7 +149,7 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) {
}) })
exported := export.Get(t, timeout) exported := export.Get(t, timeout)
assert.Len(t, exported, 1) assert.Len(t, exported, 3)
receivedKeys := map[ebpf.BpfFlowId]struct{}{} receivedKeys := map[ebpf.BpfFlowId]struct{}{}
var key1Flows []*flow.Record var key1Flows []*flow.Record
...@@ -132,14 +158,24 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) { ...@@ -132,14 +158,24 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) {
receivedKeys[f.Id] = struct{}{} receivedKeys[f.Id] = struct{}{}
switch f.Id { switch f.Id {
case key1: case key1:
assert.EqualValues(t, 3, f.Metrics.Packets) assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 44, f.Metrics.Bytes) assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.False(t, f.Duplicate) assert.False(t, f.Duplicate)
assert.Equal(t, "foo", f.Interface) assert.Equal(t, "foo", f.Interface)
key1Flows = append(key1Flows, f) key1Flows = append(key1Flows, f)
case key1Dupe:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "bar", f.Interface)
key1Flows = append(key1Flows, f)
case key2:
assert.EqualValues(t, 7, f.Metrics.Packets)
assert.EqualValues(t, 33, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
} }
} }
assert.Lenf(t, key1Flows, 1, "both key1 flows should have been forwarded: %#v", key1Flows) assert.Lenf(t, key1Flows, 2, "both key1 flows should have been forwarded: %#v", key1Flows)
} }
func TestFlowsAgent_Decoration(t *testing.T) { func TestFlowsAgent_Decoration(t *testing.T) {
...@@ -149,7 +185,7 @@ func TestFlowsAgent_Decoration(t *testing.T) { ...@@ -149,7 +185,7 @@ func TestFlowsAgent_Decoration(t *testing.T) {
}) })
exported := export.Get(t, timeout) exported := export.Get(t, timeout)
assert.Len(t, exported, 1) assert.Len(t, exported, 3)
// Tests that the decoration stage has been properly executed. It should // Tests that the decoration stage has been properly executed. It should
// add the interface name and the agent IP // add the interface name and the agent IP
...@@ -183,10 +219,17 @@ func testAgent(t *testing.T, cfg *Config) *test.ExporterFake { ...@@ -183,10 +219,17 @@ func testAgent(t *testing.T, cfg *Config) *test.ExporterFake {
}) })
now := uint64(monotime.Now()) now := uint64(monotime.Now())
key1Metrics := ebpf.BpfFlowMetrics{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000} key1Metrics := []ebpf.BpfFlowMetrics{
{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000},
ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics{ {Packets: 1, Bytes: 22, StartMonoTimeTs: now, EndMonoTimeTs: now + 3000},
key1: &key1Metrics, }
key2Metrics := []ebpf.BpfFlowMetrics{
{Packets: 7, Bytes: 33, StartMonoTimeTs: now, EndMonoTimeTs: now + 2_000_000_000},
}
ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics{
key1: key1Metrics,
key1Dupe: key1Metrics,
key2: key2Metrics,
}) })
return export return export
} }
No preview for this file type
No preview for this file type
...@@ -377,27 +377,28 @@ func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) { ...@@ -377,27 +377,28 @@ func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) {
// TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively // TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively
// Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md // Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md
// Race conditions here causes that some flows are lost in high-load scenarios // Race conditions here causes that some flows are lost in high-load scenarios
func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId]*BpfFlowMetrics { func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId][]BpfFlowMetrics {
flowMap := m.objects.AggregatedFlows flowMap := m.objects.AggregatedFlows
iterator := flowMap.Iterate() iterator := flowMap.Iterate()
var flow = make(map[BpfFlowId]*BpfFlowMetrics, m.cacheMaxSize) var flows = make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize)
var id BpfFlowId var id BpfFlowId
var metric BpfFlowMetrics var metrics []BpfFlowMetrics
// Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions // Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions
// TODO: detect whether LookupAndDelete is supported (Kernel>=4.20) and use it selectively // TODO: detect whether LookupAndDelete is supported (Kernel>=4.20) and use it selectively
for iterator.Next(&id, &metric) { for iterator.Next(&id, &metrics) {
if err := flowMap.Delete(id); err != nil { if err := flowMap.Delete(id); err != nil {
log.WithError(err).WithField("flowId", id). log.WithError(err).WithField("flowId", id).
Warnf("couldn't delete flow entry") Warnf("couldn't delete flow entry")
} }
metricPtr := new(BpfFlowMetrics) // We observed that eBFP PerCPU map might insert multiple times the same key in the map
*metricPtr = metric // (probably due to race conditions) so we need to re-join metrics again at userspace
flow[id] = metricPtr // TODO: instrument how many times the keys are is repeated in the same eviction
flows[id] = append(flows[id], metrics...)
} }
return flow return flows
} }
// DeleteMapsStaleEntries Look for any stale entries in the features maps and delete them // DeleteMapsStaleEntries Look for any stale entries in the features maps and delete them
......
...@@ -65,7 +65,9 @@ func (c *Accounter) Account(in <-chan *RawRecord, out chan<- []*Record) { ...@@ -65,7 +65,9 @@ func (c *Accounter) Account(in <-chan *RawRecord, out chan<- []*Record) {
alog.Debug("exiting account routine") alog.Debug("exiting account routine")
return return
} }
if _, ok := c.entries[record.Id]; !ok { if stored, ok := c.entries[record.Id]; ok {
Accumulate(stored, &record.Metrics)
} else {
if len(c.entries) >= c.maxEntries { if len(c.entries) >= c.maxEntries {
evictingEntries := c.entries evictingEntries := c.entries
c.entries = map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics{} c.entries = map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics{}
......
...@@ -104,11 +104,11 @@ func TestEvict_MaxEntries(t *testing.T) { ...@@ -104,11 +104,11 @@ func TestEvict_MaxEntries(t *testing.T) {
RawRecord: RawRecord{ RawRecord: RawRecord{
Id: k1, Id: k1,
Metrics: ebpf.BpfFlowMetrics{ Metrics: ebpf.BpfFlowMetrics{
Bytes: 123, Packets: 1, StartMonoTimeTs: 123, EndMonoTimeTs: 123, Flags: 1, Bytes: 444, Packets: 2, StartMonoTimeTs: 123, EndMonoTimeTs: 789, Flags: 1,
}, },
}, },
TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond), TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond),
TimeFlowEnd: now.Add(-(1000 - 123) * time.Nanosecond), TimeFlowEnd: now.Add(-(1000 - 789) * time.Nanosecond),
}, },
k2: { k2: {
RawRecord: RawRecord{ RawRecord: RawRecord{
...@@ -178,15 +178,15 @@ func TestEvict_Period(t *testing.T) { ...@@ -178,15 +178,15 @@ func TestEvict_Period(t *testing.T) {
RawRecord: RawRecord{ RawRecord: RawRecord{
Id: k1, Id: k1,
Metrics: ebpf.BpfFlowMetrics{ Metrics: ebpf.BpfFlowMetrics{
Bytes: 10, Bytes: 30,
Packets: 1, Packets: 3,
StartMonoTimeTs: 123, StartMonoTimeTs: 123,
EndMonoTimeTs: 123, EndMonoTimeTs: 789,
Flags: 1, Flags: 1,
}, },
}, },
TimeFlowStart: now.Add(-1000 + 123), TimeFlowStart: now.Add(-1000 + 123),
TimeFlowEnd: now.Add(-1000 + 123), TimeFlowEnd: now.Add(-1000 + 789),
}, *records[0]) }, *records[0])
records = receiveTimeout(t, evictor) records = receiveTimeout(t, evictor)
require.Len(t, records, 1) require.Len(t, records, 1)
...@@ -194,15 +194,15 @@ func TestEvict_Period(t *testing.T) { ...@@ -194,15 +194,15 @@ func TestEvict_Period(t *testing.T) {
RawRecord: RawRecord{ RawRecord: RawRecord{
Id: k1, Id: k1,
Metrics: ebpf.BpfFlowMetrics{ Metrics: ebpf.BpfFlowMetrics{
Bytes: 10, Bytes: 20,
Packets: 1, Packets: 2,
StartMonoTimeTs: 1123, StartMonoTimeTs: 1123,
EndMonoTimeTs: 1123, EndMonoTimeTs: 1456,
Flags: 1, Flags: 1,
}, },
}, },
TimeFlowStart: now.Add(-1000 + 1123), TimeFlowStart: now.Add(-1000 + 1123),
TimeFlowEnd: now.Add(-1000 + 1123), TimeFlowEnd: now.Add(-1000 + 1456),
}, *records[0]) }, *records[0])
// no more flows are evicted // no more flows are evicted
......
...@@ -81,6 +81,38 @@ func NewRecord( ...@@ -81,6 +81,38 @@ func NewRecord(
return &record return &record
} }
func Accumulate(r *ebpf.BpfFlowMetrics, src *ebpf.BpfFlowMetrics) {
// time == 0 if the value has not been yet set
if r.StartMonoTimeTs == 0 || r.StartMonoTimeTs > src.StartMonoTimeTs {
r.StartMonoTimeTs = src.StartMonoTimeTs
}
if r.EndMonoTimeTs == 0 || r.EndMonoTimeTs < src.EndMonoTimeTs {
r.EndMonoTimeTs = src.EndMonoTimeTs
}
r.Bytes += src.Bytes
r.Packets += src.Packets
r.Flags |= src.Flags
// Accumulate Drop statistics
r.PktDrops.Bytes += src.PktDrops.Bytes
r.PktDrops.Packets += src.PktDrops.Packets
r.PktDrops.LatestFlags |= src.PktDrops.LatestFlags
if src.PktDrops.LatestDropCause != 0 {
r.PktDrops.LatestDropCause = src.PktDrops.LatestDropCause
}
// Accumulate DNS
r.DnsRecord.Flags |= src.DnsRecord.Flags
if src.DnsRecord.Id != 0 {
r.DnsRecord.Id = src.DnsRecord.Id
}
if r.DnsRecord.Latency < src.DnsRecord.Latency {
r.DnsRecord.Latency = src.DnsRecord.Latency
}
// Accumulate RTT
if r.FlowRtt < src.FlowRtt {
r.FlowRtt = src.FlowRtt
}
}
// IP returns the net.IP equivalent object // IP returns the net.IP equivalent object
func IP(ia IPAddr) net.IP { func IP(ia IPAddr) net.IP {
return ia[:] return ia[:]
......
...@@ -27,7 +27,7 @@ type MapTracer struct { ...@@ -27,7 +27,7 @@ type MapTracer struct {
} }
type mapFetcher interface { type mapFetcher interface {
LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics LookupAndDeleteMap() map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics
DeleteMapsStaleEntries(timeOut time.Duration) DeleteMapsStaleEntries(timeOut time.Duration)
} }
...@@ -96,7 +96,7 @@ func (m *MapTracer) evictFlows(ctx context.Context, enableGC bool, forwardFlows ...@@ -96,7 +96,7 @@ func (m *MapTracer) evictFlows(ctx context.Context, enableGC bool, forwardFlows
var forwardingFlows []*Record var forwardingFlows []*Record
laterFlowNs := uint64(0) laterFlowNs := uint64(0)
for flowKey, flowMetrics := range m.mapFetcher.LookupAndDeleteMap() { for flowKey, flowMetrics := range m.mapFetcher.LookupAndDeleteMap() {
aggregatedMetrics := flowMetrics aggregatedMetrics := m.aggregate(flowMetrics)
// we ignore metrics that haven't been aggregated (e.g. all the mapped values are ignored) // we ignore metrics that haven't been aggregated (e.g. all the mapped values are ignored)
if aggregatedMetrics.EndMonoTimeTs == 0 { if aggregatedMetrics.EndMonoTimeTs == 0 {
continue continue
...@@ -126,3 +126,21 @@ func (m *MapTracer) evictFlows(ctx context.Context, enableGC bool, forwardFlows ...@@ -126,3 +126,21 @@ func (m *MapTracer) evictFlows(ctx context.Context, enableGC bool, forwardFlows
} }
mtlog.Debugf("%d flows evicted", len(forwardingFlows)) mtlog.Debugf("%d flows evicted", len(forwardingFlows))
} }
func (m *MapTracer) aggregate(metrics []ebpf.BpfFlowMetrics) *ebpf.BpfFlowMetrics {
if len(metrics) == 0 {
mtlog.Warn("invoked aggregate with no values")
return &ebpf.BpfFlowMetrics{}
}
aggr := &ebpf.BpfFlowMetrics{}
for _, mt := range metrics {
// eBPF hashmap values are not zeroed when the entry is removed. That causes that we
// might receive entries from previous collect-eviction timeslots.
// We need to check the flow time and discard old flows.
if mt.StartMonoTimeTs <= m.lastEvictionNs || mt.EndMonoTimeTs <= m.lastEvictionNs {
continue
}
Accumulate(aggr, &mt)
}
return aggr
}
...@@ -11,25 +11,36 @@ import ( ...@@ -11,25 +11,36 @@ import (
func TestPacketAggregation(t *testing.T) { func TestPacketAggregation(t *testing.T) {
type testCase struct { type testCase struct {
input ebpf.BpfFlowMetrics input []ebpf.BpfFlowMetrics
expected ebpf.BpfFlowMetrics expected ebpf.BpfFlowMetrics
} }
tcs := []testCase{{ tcs := []testCase{{
input: ebpf.BpfFlowMetrics{Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1}, input: []ebpf.BpfFlowMetrics{
{Packets: 0, Bytes: 0, StartMonoTimeTs: 0, EndMonoTimeTs: 0, Flags: 1},
{Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1},
{Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1},
{Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1},
},
expected: ebpf.BpfFlowMetrics{ expected: ebpf.BpfFlowMetrics{
Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1, Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1,
}, },
}, { }, {
input: ebpf.BpfFlowMetrics{Packets: 0x5, Bytes: 0x5c4, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1}, input: []ebpf.BpfFlowMetrics{
{Packets: 0x3, Bytes: 0x5c4, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1},
{Packets: 0x2, Bytes: 0x8c, StartMonoTimeTs: 0x17f3e9633a7f, EndMonoTimeTs: 0x17f3e96f164e, Flags: 1},
{Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1},
{Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1},
},
expected: ebpf.BpfFlowMetrics{ expected: ebpf.BpfFlowMetrics{
Packets: 0x5, Bytes: 0x5c4, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1, Packets: 0x5, Bytes: 0x5c4 + 0x8c, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1,
}, },
}} }}
ft := MapTracer{}
for i, tc := range tcs { for i, tc := range tcs {
t.Run(fmt.Sprint(i), func(t *testing.T) { t.Run(fmt.Sprint(i), func(t *testing.T) {
assert.Equal(t, assert.Equal(t,
tc.expected, tc.expected,
tc.input) *ft.aggregate(tc.input))
}) })
} }
} }
...@@ -14,14 +14,14 @@ import ( ...@@ -14,14 +14,14 @@ import (
// TracerFake fakes the kernel-side eBPF map structures for testing // TracerFake fakes the kernel-side eBPF map structures for testing
type TracerFake struct { type TracerFake struct {
interfaces map[ifaces.Interface]struct{} interfaces map[ifaces.Interface]struct{}
mapLookups chan map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics mapLookups chan map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics
ringBuf chan ringbuf.Record ringBuf chan ringbuf.Record
} }
func NewTracerFake() *TracerFake { func NewTracerFake() *TracerFake {
return &TracerFake{ return &TracerFake{
interfaces: map[ifaces.Interface]struct{}{}, interfaces: map[ifaces.Interface]struct{}{},
mapLookups: make(chan map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics, 100), mapLookups: make(chan map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics, 100),
ringBuf: make(chan ringbuf.Record, 100), ringBuf: make(chan ringbuf.Record, 100),
} }
} }
...@@ -34,12 +34,12 @@ func (m *TracerFake) Register(iface ifaces.Interface) error { ...@@ -34,12 +34,12 @@ func (m *TracerFake) Register(iface ifaces.Interface) error {
return nil return nil
} }
func (m *TracerFake) LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics { func (m *TracerFake) LookupAndDeleteMap() map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics {
select { select {
case r := <-m.mapLookups: case r := <-m.mapLookups:
return r return r
default: default:
return map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics{} return map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics{}
} }
} }
...@@ -50,7 +50,7 @@ func (m *TracerFake) ReadRingBuf() (ringbuf.Record, error) { ...@@ -50,7 +50,7 @@ func (m *TracerFake) ReadRingBuf() (ringbuf.Record, error) {
return <-m.ringBuf, nil return <-m.ringBuf, nil
} }
func (m *TracerFake) AppendLookupResults(results map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics) { func (m *TracerFake) AppendLookupResults(results map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics) {
m.mapLookups <- results m.mapLookups <- results
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment