diff --git a/bpf/flows.c b/bpf/flows.c index 373bc561fc578da3b5de480f4cff0382228cb0df..8a4aa232799113118ae47177123ac7b173a33c46 100644 --- a/bpf/flows.c +++ b/bpf/flows.c @@ -75,6 +75,11 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { aggregate_flow->packets += 1; aggregate_flow->bytes += skb->len; aggregate_flow->end_mono_time_ts = pkt.current_ts; + // it might happen that start_mono_time hasn't been set due to + // the way percpu hashmap deal with concurrent map entries + if (aggregate_flow->start_mono_time_ts == 0) { + aggregate_flow->start_mono_time_ts = pkt.current_ts; + } aggregate_flow->flags |= pkt.flags; // Does not matter the gate. Will be zero if not enabled. diff --git a/bpf/maps_definition.h b/bpf/maps_definition.h index ca07543aac51bc3c42397d5b69ca9b75f303d367..8b22494c7019f9b6a485681c5de3dce967691aaa 100644 --- a/bpf/maps_definition.h +++ b/bpf/maps_definition.h @@ -11,7 +11,7 @@ struct { // Key: the flow identifier. Value: the flow metrics for that identifier. struct { - __uint(type, BPF_MAP_TYPE_HASH); + __uint(type, BPF_MAP_TYPE_PERCPU_HASH); __type(key, flow_id); __type(value, flow_metrics); __uint(max_entries, 1 << 24); diff --git a/bpf/utils.h b/bpf/utils.h index ce05dd13c3767c250c50090ed0615b28f6ab74b6..f0932e13113f0d1a240baf6aa9f6a020f886e642 100644 --- a/bpf/utils.h +++ b/bpf/utils.h @@ -253,12 +253,13 @@ static inline long pkt_drop_lookup_and_update_flow(struct sk_buff *skb, flow_id enum skb_drop_reason reason) { flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, id); if (aggregate_flow != NULL) { + aggregate_flow->end_mono_time_ts = bpf_ktime_get_ns(); aggregate_flow->pkt_drops.packets += 1; aggregate_flow->pkt_drops.bytes += skb->len; aggregate_flow->pkt_drops.latest_state = state; aggregate_flow->pkt_drops.latest_flags = flags; aggregate_flow->pkt_drops.latest_drop_cause = reason; - long ret = bpf_map_update_elem(&aggregated_flows, id, aggregate_flow, BPF_ANY); + long ret = bpf_map_update_elem(&aggregated_flows, id, aggregate_flow, BPF_EXIST); if (trace_messages && ret != 0) { bpf_printk("error packet drop updating flow %d\n", ret); } diff --git a/docs/architecture.md b/docs/architecture.md index 56fa49326138298d4997f4891cdd7f5758c37eb8..f659b92a219fe28b97673dc3fea12d8d43fa281c 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -11,7 +11,7 @@ flowchart TD E(ebpf.FlowFetcher) --> |"pushes via<br/>RingBuffer"| RB(flow.RingBufTracer) style E fill:#990 - E --> |"polls<br/>HashMap"| M(flow.MapTracer) + E --> |"polls<br/>PerCPUHashMap"| M(flow.MapTracer) RB --> |chan *flow.Record| ACC(flow.Accounter) RB -.-> |flushes| M ACC --> |"chan []*flow.Record"| DD(flow.Deduper) diff --git a/examples/flowlogs-dump/server/flowlogs-dump-collector.go b/examples/flowlogs-dump/server/flowlogs-dump-collector.go index bed9028f3e348e05ffb39447bf73e76df55a4aba..8eb925d34ac8c68e7f56b3619dedc625a433535c 100644 --- a/examples/flowlogs-dump/server/flowlogs-dump-collector.go +++ b/examples/flowlogs-dump/server/flowlogs-dump-collector.go @@ -72,7 +72,7 @@ func main() { for records := range receivedRecords { for _, record := range records.Entries { if record.EthProtocol == ipv6 { - log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v\n", + log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v DropPkts: %d DropBytes: %d DropCause %d\n", ipProto[record.EthProtocol], record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"), record.Interface, @@ -92,9 +92,12 @@ func main() { record.GetDnsFlags(), record.DnsLatency.AsDuration().Milliseconds(), record.TimeFlowRtt.AsDuration().Nanoseconds(), + record.GetPktDropPackets(), + record.GetPktDropBytes(), + record.GetPktDropLatestDropCause(), ) } else { - log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v\n", + log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v DropPkts: %d DropBytes: %d DropCause %d\n", ipProto[record.EthProtocol], record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"), record.Interface, @@ -114,6 +117,9 @@ func main() { record.GetDnsFlags(), record.DnsLatency.AsDuration().Milliseconds(), record.TimeFlowRtt.AsDuration().Nanoseconds(), + record.GetPktDropPackets(), + record.GetPktDropBytes(), + record.GetPktDropLatestDropCause(), ) } } diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 326de2b3923ed63877f2b02dceb639d6d6055639..a20e76b91a89a28a8bed023e06cdf5b11871d9e9 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -78,7 +78,7 @@ type ebpfFlowFetcher interface { io.Closer Register(iface ifaces.Interface) error - LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics + LookupAndDeleteMap() map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics DeleteMapsStaleEntries(timeOut time.Duration) ReadRingBuf() (ringbuf.Record, error) } diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go index 1cf3a1ff5017522ad7744967c604ce9dee3266c4..2710f3f6fd727ab5a93181cffc1ac69bd79c62bc 100644 --- a/pkg/agent/agent_test.go +++ b/pkg/agent/agent_test.go @@ -49,6 +49,11 @@ var ( DstPort: 456, IfIndex: 3, } + key1Dupe = ebpf.BpfFlowId{ + SrcPort: 123, + DstPort: 456, + IfIndex: 4, + } key2 = ebpf.BpfFlowId{ SrcPort: 333, @@ -66,7 +71,7 @@ func TestFlowsAgent_Deduplication(t *testing.T) { }) exported := export.Get(t, timeout) - assert.Len(t, exported, 1) + assert.Len(t, exported, 2) receivedKeys := map[ebpf.BpfFlowId]struct{}{} @@ -76,11 +81,21 @@ func TestFlowsAgent_Deduplication(t *testing.T) { receivedKeys[f.Id] = struct{}{} switch f.Id { case key1: - assert.EqualValues(t, 3, f.Metrics.Packets) - assert.EqualValues(t, 44, f.Metrics.Bytes) + assert.EqualValues(t, 4, f.Metrics.Packets) + assert.EqualValues(t, 66, f.Metrics.Bytes) assert.False(t, f.Duplicate) assert.Equal(t, "foo", f.Interface) key1Flows = append(key1Flows, f) + case key1Dupe: + assert.EqualValues(t, 4, f.Metrics.Packets) + assert.EqualValues(t, 66, f.Metrics.Bytes) + assert.False(t, f.Duplicate) + assert.Equal(t, "bar", f.Interface) + key1Flows = append(key1Flows, f) + case key2: + assert.EqualValues(t, 7, f.Metrics.Packets) + assert.EqualValues(t, 33, f.Metrics.Bytes) + assert.False(t, f.Duplicate) } } assert.Lenf(t, key1Flows, 1, "only one flow should have been forwarded: %#v", key1Flows) @@ -97,22 +112,33 @@ func TestFlowsAgent_DeduplicationJustMark(t *testing.T) { exported := export.Get(t, timeout) receivedKeys := map[ebpf.BpfFlowId]struct{}{} - assert.Len(t, exported, 1) + assert.Len(t, exported, 3) duplicates := 0 for _, f := range exported { require.NotContains(t, receivedKeys, f.Id) receivedKeys[f.Id] = struct{}{} switch f.Id { case key1: - assert.EqualValues(t, 3, f.Metrics.Packets) - assert.EqualValues(t, 44, f.Metrics.Bytes) + assert.EqualValues(t, 4, f.Metrics.Packets) + assert.EqualValues(t, 66, f.Metrics.Bytes) if f.Duplicate { duplicates++ } assert.Equal(t, "foo", f.Interface) + case key1Dupe: + assert.EqualValues(t, 4, f.Metrics.Packets) + assert.EqualValues(t, 66, f.Metrics.Bytes) + if f.Duplicate { + duplicates++ + } + assert.Equal(t, "bar", f.Interface) + case key2: + assert.EqualValues(t, 7, f.Metrics.Packets) + assert.EqualValues(t, 33, f.Metrics.Bytes) + assert.False(t, f.Duplicate) } } - assert.Equalf(t, 0, duplicates, "exported flows should have only one duplicate: %#v", exported) + assert.Equalf(t, 1, duplicates, "exported flows should have only one duplicate: %#v", exported) } func TestFlowsAgent_Deduplication_None(t *testing.T) { @@ -123,7 +149,7 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) { }) exported := export.Get(t, timeout) - assert.Len(t, exported, 1) + assert.Len(t, exported, 3) receivedKeys := map[ebpf.BpfFlowId]struct{}{} var key1Flows []*flow.Record @@ -132,14 +158,24 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) { receivedKeys[f.Id] = struct{}{} switch f.Id { case key1: - assert.EqualValues(t, 3, f.Metrics.Packets) - assert.EqualValues(t, 44, f.Metrics.Bytes) + assert.EqualValues(t, 4, f.Metrics.Packets) + assert.EqualValues(t, 66, f.Metrics.Bytes) assert.False(t, f.Duplicate) assert.Equal(t, "foo", f.Interface) key1Flows = append(key1Flows, f) + case key1Dupe: + assert.EqualValues(t, 4, f.Metrics.Packets) + assert.EqualValues(t, 66, f.Metrics.Bytes) + assert.False(t, f.Duplicate) + assert.Equal(t, "bar", f.Interface) + key1Flows = append(key1Flows, f) + case key2: + assert.EqualValues(t, 7, f.Metrics.Packets) + assert.EqualValues(t, 33, f.Metrics.Bytes) + assert.False(t, f.Duplicate) } } - assert.Lenf(t, key1Flows, 1, "both key1 flows should have been forwarded: %#v", key1Flows) + assert.Lenf(t, key1Flows, 2, "both key1 flows should have been forwarded: %#v", key1Flows) } func TestFlowsAgent_Decoration(t *testing.T) { @@ -149,7 +185,7 @@ func TestFlowsAgent_Decoration(t *testing.T) { }) exported := export.Get(t, timeout) - assert.Len(t, exported, 1) + assert.Len(t, exported, 3) // Tests that the decoration stage has been properly executed. It should // add the interface name and the agent IP @@ -183,10 +219,17 @@ func testAgent(t *testing.T, cfg *Config) *test.ExporterFake { }) now := uint64(monotime.Now()) - key1Metrics := ebpf.BpfFlowMetrics{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000} - - ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics{ - key1: &key1Metrics, + key1Metrics := []ebpf.BpfFlowMetrics{ + {Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000}, + {Packets: 1, Bytes: 22, StartMonoTimeTs: now, EndMonoTimeTs: now + 3000}, + } + key2Metrics := []ebpf.BpfFlowMetrics{ + {Packets: 7, Bytes: 33, StartMonoTimeTs: now, EndMonoTimeTs: now + 2_000_000_000}, + } + ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics{ + key1: key1Metrics, + key1Dupe: key1Metrics, + key2: key2Metrics, }) return export } diff --git a/pkg/ebpf/bpf_bpfeb.o b/pkg/ebpf/bpf_bpfeb.o index b7d0979401747cc2a100c166a54cac0156314c5b..4af479c293eb0d9fc253c72caa9fbae675edb33f 100644 Binary files a/pkg/ebpf/bpf_bpfeb.o and b/pkg/ebpf/bpf_bpfeb.o differ diff --git a/pkg/ebpf/bpf_bpfel.o b/pkg/ebpf/bpf_bpfel.o index 6cc2976532c7ce9021e109fa087c940ab959b188..9178cef275556d76f37c51fb275a15bb808b214f 100644 Binary files a/pkg/ebpf/bpf_bpfel.o and b/pkg/ebpf/bpf_bpfel.o differ diff --git a/pkg/ebpf/tracer.go b/pkg/ebpf/tracer.go index 755016e4e496fde8ccd150f8b609e1268c9d48d6..8d505bb48bca25459fe22626bbf8b6cbd9d8228d 100644 --- a/pkg/ebpf/tracer.go +++ b/pkg/ebpf/tracer.go @@ -377,27 +377,28 @@ func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) { // TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively // Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md // Race conditions here causes that some flows are lost in high-load scenarios -func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId]*BpfFlowMetrics { +func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId][]BpfFlowMetrics { flowMap := m.objects.AggregatedFlows iterator := flowMap.Iterate() - var flow = make(map[BpfFlowId]*BpfFlowMetrics, m.cacheMaxSize) + var flows = make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize) var id BpfFlowId - var metric BpfFlowMetrics + var metrics []BpfFlowMetrics // Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions // TODO: detect whether LookupAndDelete is supported (Kernel>=4.20) and use it selectively - for iterator.Next(&id, &metric) { + for iterator.Next(&id, &metrics) { if err := flowMap.Delete(id); err != nil { log.WithError(err).WithField("flowId", id). Warnf("couldn't delete flow entry") } - metricPtr := new(BpfFlowMetrics) - *metricPtr = metric - flow[id] = metricPtr + // We observed that eBFP PerCPU map might insert multiple times the same key in the map + // (probably due to race conditions) so we need to re-join metrics again at userspace + // TODO: instrument how many times the keys are is repeated in the same eviction + flows[id] = append(flows[id], metrics...) } - return flow + return flows } // DeleteMapsStaleEntries Look for any stale entries in the features maps and delete them diff --git a/pkg/flow/account.go b/pkg/flow/account.go index b88840e04523abe8db2310b94aab6ceaa6caf149..d23223fdc44ab4b1a49ceacec20eb545f9ce9570 100644 --- a/pkg/flow/account.go +++ b/pkg/flow/account.go @@ -65,7 +65,9 @@ func (c *Accounter) Account(in <-chan *RawRecord, out chan<- []*Record) { alog.Debug("exiting account routine") return } - if _, ok := c.entries[record.Id]; !ok { + if stored, ok := c.entries[record.Id]; ok { + Accumulate(stored, &record.Metrics) + } else { if len(c.entries) >= c.maxEntries { evictingEntries := c.entries c.entries = map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics{} diff --git a/pkg/flow/account_test.go b/pkg/flow/account_test.go index c53df299c17401c296ccc2895a48913abd8b1206..1600549320e242b1dedfc1b3282a036a4ab8bed0 100644 --- a/pkg/flow/account_test.go +++ b/pkg/flow/account_test.go @@ -104,11 +104,11 @@ func TestEvict_MaxEntries(t *testing.T) { RawRecord: RawRecord{ Id: k1, Metrics: ebpf.BpfFlowMetrics{ - Bytes: 123, Packets: 1, StartMonoTimeTs: 123, EndMonoTimeTs: 123, Flags: 1, + Bytes: 444, Packets: 2, StartMonoTimeTs: 123, EndMonoTimeTs: 789, Flags: 1, }, }, TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond), - TimeFlowEnd: now.Add(-(1000 - 123) * time.Nanosecond), + TimeFlowEnd: now.Add(-(1000 - 789) * time.Nanosecond), }, k2: { RawRecord: RawRecord{ @@ -178,15 +178,15 @@ func TestEvict_Period(t *testing.T) { RawRecord: RawRecord{ Id: k1, Metrics: ebpf.BpfFlowMetrics{ - Bytes: 10, - Packets: 1, + Bytes: 30, + Packets: 3, StartMonoTimeTs: 123, - EndMonoTimeTs: 123, + EndMonoTimeTs: 789, Flags: 1, }, }, TimeFlowStart: now.Add(-1000 + 123), - TimeFlowEnd: now.Add(-1000 + 123), + TimeFlowEnd: now.Add(-1000 + 789), }, *records[0]) records = receiveTimeout(t, evictor) require.Len(t, records, 1) @@ -194,15 +194,15 @@ func TestEvict_Period(t *testing.T) { RawRecord: RawRecord{ Id: k1, Metrics: ebpf.BpfFlowMetrics{ - Bytes: 10, - Packets: 1, + Bytes: 20, + Packets: 2, StartMonoTimeTs: 1123, - EndMonoTimeTs: 1123, + EndMonoTimeTs: 1456, Flags: 1, }, }, TimeFlowStart: now.Add(-1000 + 1123), - TimeFlowEnd: now.Add(-1000 + 1123), + TimeFlowEnd: now.Add(-1000 + 1456), }, *records[0]) // no more flows are evicted diff --git a/pkg/flow/record.go b/pkg/flow/record.go index f3e173da102e354a4da2cf798e02b7e10d59ff93..9b28caab97a7d9b0a1baaa094c9cc47c481c2016 100644 --- a/pkg/flow/record.go +++ b/pkg/flow/record.go @@ -81,6 +81,38 @@ func NewRecord( return &record } +func Accumulate(r *ebpf.BpfFlowMetrics, src *ebpf.BpfFlowMetrics) { + // time == 0 if the value has not been yet set + if r.StartMonoTimeTs == 0 || r.StartMonoTimeTs > src.StartMonoTimeTs { + r.StartMonoTimeTs = src.StartMonoTimeTs + } + if r.EndMonoTimeTs == 0 || r.EndMonoTimeTs < src.EndMonoTimeTs { + r.EndMonoTimeTs = src.EndMonoTimeTs + } + r.Bytes += src.Bytes + r.Packets += src.Packets + r.Flags |= src.Flags + // Accumulate Drop statistics + r.PktDrops.Bytes += src.PktDrops.Bytes + r.PktDrops.Packets += src.PktDrops.Packets + r.PktDrops.LatestFlags |= src.PktDrops.LatestFlags + if src.PktDrops.LatestDropCause != 0 { + r.PktDrops.LatestDropCause = src.PktDrops.LatestDropCause + } + // Accumulate DNS + r.DnsRecord.Flags |= src.DnsRecord.Flags + if src.DnsRecord.Id != 0 { + r.DnsRecord.Id = src.DnsRecord.Id + } + if r.DnsRecord.Latency < src.DnsRecord.Latency { + r.DnsRecord.Latency = src.DnsRecord.Latency + } + // Accumulate RTT + if r.FlowRtt < src.FlowRtt { + r.FlowRtt = src.FlowRtt + } +} + // IP returns the net.IP equivalent object func IP(ia IPAddr) net.IP { return ia[:] diff --git a/pkg/flow/tracer_map.go b/pkg/flow/tracer_map.go index 1cfd6d6df7287e0f16229f270d12aaa7697646a2..e46ae4be516ab94a714beffea954612238e7e3da 100644 --- a/pkg/flow/tracer_map.go +++ b/pkg/flow/tracer_map.go @@ -27,7 +27,7 @@ type MapTracer struct { } type mapFetcher interface { - LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics + LookupAndDeleteMap() map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics DeleteMapsStaleEntries(timeOut time.Duration) } @@ -96,7 +96,7 @@ func (m *MapTracer) evictFlows(ctx context.Context, enableGC bool, forwardFlows var forwardingFlows []*Record laterFlowNs := uint64(0) for flowKey, flowMetrics := range m.mapFetcher.LookupAndDeleteMap() { - aggregatedMetrics := flowMetrics + aggregatedMetrics := m.aggregate(flowMetrics) // we ignore metrics that haven't been aggregated (e.g. all the mapped values are ignored) if aggregatedMetrics.EndMonoTimeTs == 0 { continue @@ -126,3 +126,21 @@ func (m *MapTracer) evictFlows(ctx context.Context, enableGC bool, forwardFlows } mtlog.Debugf("%d flows evicted", len(forwardingFlows)) } + +func (m *MapTracer) aggregate(metrics []ebpf.BpfFlowMetrics) *ebpf.BpfFlowMetrics { + if len(metrics) == 0 { + mtlog.Warn("invoked aggregate with no values") + return &ebpf.BpfFlowMetrics{} + } + aggr := &ebpf.BpfFlowMetrics{} + for _, mt := range metrics { + // eBPF hashmap values are not zeroed when the entry is removed. That causes that we + // might receive entries from previous collect-eviction timeslots. + // We need to check the flow time and discard old flows. + if mt.StartMonoTimeTs <= m.lastEvictionNs || mt.EndMonoTimeTs <= m.lastEvictionNs { + continue + } + Accumulate(aggr, &mt) + } + return aggr +} diff --git a/pkg/flow/tracer_map_test.go b/pkg/flow/tracer_map_test.go index 9ea7c168008c4c020f72d855bac0dd806ba0b2e1..d0728791e5a2e9c0f4fd74ff35590b95dcd67ac2 100644 --- a/pkg/flow/tracer_map_test.go +++ b/pkg/flow/tracer_map_test.go @@ -11,25 +11,36 @@ import ( func TestPacketAggregation(t *testing.T) { type testCase struct { - input ebpf.BpfFlowMetrics + input []ebpf.BpfFlowMetrics expected ebpf.BpfFlowMetrics } tcs := []testCase{{ - input: ebpf.BpfFlowMetrics{Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1}, + input: []ebpf.BpfFlowMetrics{ + {Packets: 0, Bytes: 0, StartMonoTimeTs: 0, EndMonoTimeTs: 0, Flags: 1}, + {Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1}, + {Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1}, + {Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1}, + }, expected: ebpf.BpfFlowMetrics{ Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1, }, }, { - input: ebpf.BpfFlowMetrics{Packets: 0x5, Bytes: 0x5c4, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1}, + input: []ebpf.BpfFlowMetrics{ + {Packets: 0x3, Bytes: 0x5c4, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1}, + {Packets: 0x2, Bytes: 0x8c, StartMonoTimeTs: 0x17f3e9633a7f, EndMonoTimeTs: 0x17f3e96f164e, Flags: 1}, + {Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1}, + {Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1}, + }, expected: ebpf.BpfFlowMetrics{ - Packets: 0x5, Bytes: 0x5c4, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1, + Packets: 0x5, Bytes: 0x5c4 + 0x8c, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1, }, }} + ft := MapTracer{} for i, tc := range tcs { t.Run(fmt.Sprint(i), func(t *testing.T) { assert.Equal(t, tc.expected, - tc.input) + *ft.aggregate(tc.input)) }) } } diff --git a/pkg/test/tracer_fake.go b/pkg/test/tracer_fake.go index 522c63326ac23ac99574de0c5484f09d7493dd8d..e41a95ac0ea43e1f487f065b8652042d9900b992 100644 --- a/pkg/test/tracer_fake.go +++ b/pkg/test/tracer_fake.go @@ -14,14 +14,14 @@ import ( // TracerFake fakes the kernel-side eBPF map structures for testing type TracerFake struct { interfaces map[ifaces.Interface]struct{} - mapLookups chan map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics + mapLookups chan map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics ringBuf chan ringbuf.Record } func NewTracerFake() *TracerFake { return &TracerFake{ interfaces: map[ifaces.Interface]struct{}{}, - mapLookups: make(chan map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics, 100), + mapLookups: make(chan map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics, 100), ringBuf: make(chan ringbuf.Record, 100), } } @@ -34,12 +34,12 @@ func (m *TracerFake) Register(iface ifaces.Interface) error { return nil } -func (m *TracerFake) LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics { +func (m *TracerFake) LookupAndDeleteMap() map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics { select { case r := <-m.mapLookups: return r default: - return map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics{} + return map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics{} } } @@ -50,7 +50,7 @@ func (m *TracerFake) ReadRingBuf() (ringbuf.Record, error) { return <-m.ringBuf, nil } -func (m *TracerFake) AppendLookupResults(results map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics) { +func (m *TracerFake) AppendLookupResults(results map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics) { m.mapLookups <- results }