Newer
Older
"runtime"
"sync"
"time"
"github.com/gavv/monotime"
"github.com/netobserv/gopipes/pkg/node"
"github.com/sirupsen/logrus"
Mohamed S. Mahmoud
committed
"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
)
var mtlog = logrus.WithField("component", "flow.MapTracer")
// MapTracer accesses a mapped source of flows (the eBPF PerCPU HashMap), deserializes it into
// a flow Record structure, and performs the accumulation of each perCPU-record into a single flow
type MapTracer struct {
mapFetcher mapFetcher
evictionTimeout time.Duration
staleEntriesEvictTimeout time.Duration
// manages the access to the eviction routines, avoiding two evictions happening at the same time
evictionCond *sync.Cond
lastEvictionNs uint64
}
type mapFetcher interface {
Mohamed S. Mahmoud
committed
LookupAndDeleteMap() map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics
DeleteMapsStaleEntries(timeOut time.Duration)
func NewMapTracer(fetcher mapFetcher, evictionTimeout, staleEntriesEvictTimeout time.Duration) *MapTracer {
mapFetcher: fetcher,
evictionTimeout: evictionTimeout,
lastEvictionNs: uint64(monotime.Now()),
evictionCond: sync.NewCond(&sync.Mutex{}),
staleEntriesEvictTimeout: staleEntriesEvictTimeout,
}
}
// Flush forces reading (and removing) all the flows from the source eBPF map
// and sending the entries to the next stage in the pipeline
func (m *MapTracer) Flush() {
m.evictionCond.Broadcast()
}
func (m *MapTracer) TraceLoop(ctx context.Context, enableGC bool) node.StartFunc[[]*Record] {
return func(out chan<- []*Record) {
evictionTicker := time.NewTicker(m.evictionTimeout)
go m.evictionSynchronization(ctx, enableGC, out)
for {
select {
case <-ctx.Done():
evictionTicker.Stop()
mtlog.Debug("exiting trace loop due to context cancellation")
return
case <-evictionTicker.C:
mtlog.Debug("triggering flow eviction on timer")
m.Flush()
}
}
}
}
// evictionSynchronization loop just waits for the evictionCond to happen
// and triggers the actual eviction. It makes sure that only one eviction
// is being triggered at the same time
func (m *MapTracer) evictionSynchronization(ctx context.Context, enableGC bool, out chan<- []*Record) {
// flow eviction loop. It just keeps waiting for eviction until someone triggers the
// evictionCond.Broadcast signal
for {
// make sure we only evict once at a time, even if there are multiple eviction signals
m.evictionCond.L.Lock()
m.evictionCond.Wait()
select {
case <-ctx.Done():
mtlog.Debug("context canceled. Stopping goroutine before evicting flows")
return
default:
mtlog.Debug("evictionSynchronization signal received")
m.evictFlows(ctx, enableGC, out)
}
m.evictionCond.L.Unlock()
}
}
func (m *MapTracer) evictFlows(ctx context.Context, enableGC bool, forwardFlows chan<- []*Record) {
// it's important that this monotonic timer reports same or approximate values as kernel-side bpf_ktime_get_ns()
monotonicTimeNow := monotime.Now()
currentTime := time.Now()
var forwardingFlows []*Record
laterFlowNs := uint64(0)
for flowKey, flowMetrics := range m.mapFetcher.LookupAndDeleteMap() {
Mohamed S. Mahmoud
committed
aggregatedMetrics := m.aggregate(flowMetrics)
// we ignore metrics that haven't been aggregated (e.g. all the mapped values are ignored)
Mohamed S. Mahmoud
committed
if aggregatedMetrics.EndMonoTimeTs == 0 {
continue
}
// If it iterated an entry that do not have updated flows
Mohamed S. Mahmoud
committed
if aggregatedMetrics.EndMonoTimeTs > laterFlowNs {
laterFlowNs = aggregatedMetrics.EndMonoTimeTs
}
forwardingFlows = append(forwardingFlows, NewRecord(
flowKey,
aggregatedMetrics,
currentTime,
uint64(monotonicTimeNow),
))
}
m.mapFetcher.DeleteMapsStaleEntries(m.staleEntriesEvictTimeout)
select {
case <-ctx.Done():
mtlog.Debug("skipping flow eviction as agent is being stopped")
default:
forwardFlows <- forwardingFlows
}
if enableGC {
runtime.GC()
}
mtlog.Debugf("%d flows evicted", len(forwardingFlows))
}
Mohamed S. Mahmoud
committed
func (m *MapTracer) aggregate(metrics []ebpf.BpfFlowMetrics) *ebpf.BpfFlowMetrics {
if len(metrics) == 0 {
mtlog.Warn("invoked aggregate with no values")
return &ebpf.BpfFlowMetrics{}
}
aggr := &ebpf.BpfFlowMetrics{}
for _, mt := range metrics {
// eBPF hashmap values are not zeroed when the entry is removed. That causes that we
// might receive entries from previous collect-eviction timeslots.
// We need to check the flow time and discard old flows.
if mt.StartMonoTimeTs <= m.lastEvictionNs || mt.EndMonoTimeTs <= m.lastEvictionNs {
continue
}
Accumulate(aggr, &mt)
}
return aggr
}