From 338d2b2294dd39dac93d115148f813df0d3272cf Mon Sep 17 00:00:00 2001
From: "Mohamed S. Mahmoud" <mmahmoud@redhat.com>
Date: Tue, 25 Jul 2023 10:17:38 -0400
Subject: [PATCH] Evict DNS and RTT stale entries everytime we evict flows
 table (#163)

Signed-off-by: msherif1234 <mmahmoud@redhat.com>
---
 pkg/agent/agent.go      |  3 ++-
 pkg/agent/config.go     |  3 +++
 pkg/ebpf/tracer.go      | 47 +++++++++++++++++++++++++++++++++++++++++
 pkg/flow/tracer_map.go  | 18 ++++++++++------
 pkg/test/tracer_fake.go |  4 ++++
 5 files changed, 67 insertions(+), 8 deletions(-)

diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go
index 2e129b43..326de2b3 100644
--- a/pkg/agent/agent.go
+++ b/pkg/agent/agent.go
@@ -79,6 +79,7 @@ type ebpfFlowFetcher interface {
 	Register(iface ifaces.Interface) error
 
 	LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics
+	DeleteMapsStaleEntries(timeOut time.Duration)
 	ReadRingBuf() (ringbuf.Record, error)
 }
 
@@ -163,7 +164,7 @@ func flowsAgent(cfg *Config,
 		return iface
 	}
 
-	mapTracer := flow.NewMapTracer(fetcher, cfg.CacheActiveTimeout)
+	mapTracer := flow.NewMapTracer(fetcher, cfg.CacheActiveTimeout, cfg.StaleEntriesEvictTimeout)
 	rbTracer := flow.NewRingBufTracer(fetcher, mapTracer, cfg.CacheActiveTimeout)
 	accounter := flow.NewAccounter(
 		cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now)
diff --git a/pkg/agent/config.go b/pkg/agent/config.go
index 426afe42..bf1c8241 100644
--- a/pkg/agent/config.go
+++ b/pkg/agent/config.go
@@ -146,4 +146,7 @@ type Config struct {
 	EnablePktDrops bool `env:"ENABLE_PKT_DROPS" envDefault:"false"`
 	// EnableDNSTracking enable DNS tracking eBPF hook to track dns query/response flows
 	EnableDNSTracking bool `env:"ENABLE_DNS_TRACKING" envDefault:"false"`
+	// StaleEntriesEvictTimeout specifies the maximum duration that stale entries are kept
+	// before being deleted, default is 5 seconds.
+	StaleEntriesEvictTimeout time.Duration `env:"STALE_ENTRIES_EVICT_TIMEOUT" envDefault:"5s"`
 }
diff --git a/pkg/ebpf/tracer.go b/pkg/ebpf/tracer.go
index 0d80c209..48c49bf3 100644
--- a/pkg/ebpf/tracer.go
+++ b/pkg/ebpf/tracer.go
@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"io/fs"
 	"strings"
+	"time"
 
 	"github.com/netobserv/netobserv-ebpf-agent/pkg/ifaces"
 	"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
@@ -14,6 +15,7 @@ import (
 	"github.com/cilium/ebpf/link"
 	"github.com/cilium/ebpf/ringbuf"
 	"github.com/cilium/ebpf/rlimit"
+	"github.com/gavv/monotime"
 	"github.com/sirupsen/logrus"
 	"github.com/vishvananda/netlink"
 	"golang.org/x/sys/unix"
@@ -432,5 +434,50 @@ func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId]*BpfFlowMetrics {
 		*metricPtr = metric
 		flow[id] = metricPtr
 	}
+
 	return flow
 }
+
+// DeleteMapsStaleEntries Look for any stale entries in the features maps and delete them
+func (m *FlowFetcher) DeleteMapsStaleEntries(timeOut time.Duration) {
+	m.lookupAndDeleteDNSMap(timeOut)
+	m.lookupAndDeleteRTTMap(timeOut)
+}
+
+// lookupAndDeleteDNSMap iterate over DNS queries map and delete any stale DNS requests
+// entries which never get responses for.
+func (m *FlowFetcher) lookupAndDeleteDNSMap(timeOut time.Duration) {
+	monotonicTimeNow := monotime.Now()
+	dnsMap := m.objects.DnsFlows
+	var dnsKey BpfDnsFlowId
+	var dnsVal uint64
+
+	iterator := dnsMap.Iterate()
+	for iterator.Next(&dnsKey, &dnsVal) {
+		if time.Duration(uint64(monotonicTimeNow)-dnsVal) >= timeOut {
+			if err := dnsMap.Delete(dnsKey); err != nil {
+				log.WithError(err).WithField("dnsKey", dnsKey).
+					Warnf("couldn't delete DNS record entry")
+			}
+		}
+	}
+}
+
+// lookupAndDeleteRTTMap iterate over flows sequence map and delete any
+// stale flows that we never get responses for.
+func (m *FlowFetcher) lookupAndDeleteRTTMap(timeOut time.Duration) {
+	monotonicTimeNow := monotime.Now()
+	rttMap := m.objects.FlowSequences
+	var rttKey BpfFlowSeqId
+	var rttVal uint64
+
+	iterator := rttMap.Iterate()
+	for iterator.Next(&rttKey, &rttVal) {
+		if time.Duration(uint64(monotonicTimeNow)-rttVal) >= timeOut {
+			if err := rttMap.Delete(rttKey); err != nil {
+				log.WithError(err).WithField("rttKey", rttKey).
+					Warnf("couldn't delete RTT record entry")
+			}
+		}
+	}
+}
diff --git a/pkg/flow/tracer_map.go b/pkg/flow/tracer_map.go
index 8a011889..1cfd6d6d 100644
--- a/pkg/flow/tracer_map.go
+++ b/pkg/flow/tracer_map.go
@@ -18,8 +18,9 @@ var mtlog = logrus.WithField("component", "flow.MapTracer")
 // MapTracer accesses a mapped source of flows (the eBPF PerCPU HashMap), deserializes it into
 // a flow Record structure, and performs the accumulation of each perCPU-record into a single flow
 type MapTracer struct {
-	mapFetcher      mapFetcher
-	evictionTimeout time.Duration
+	mapFetcher               mapFetcher
+	evictionTimeout          time.Duration
+	staleEntriesEvictTimeout time.Duration
 	// manages the access to the eviction routines, avoiding two evictions happening at the same time
 	evictionCond   *sync.Cond
 	lastEvictionNs uint64
@@ -27,14 +28,16 @@ type MapTracer struct {
 
 type mapFetcher interface {
 	LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics
+	DeleteMapsStaleEntries(timeOut time.Duration)
 }
 
-func NewMapTracer(fetcher mapFetcher, evictionTimeout time.Duration) *MapTracer {
+func NewMapTracer(fetcher mapFetcher, evictionTimeout, staleEntriesEvictTimeout time.Duration) *MapTracer {
 	return &MapTracer{
-		mapFetcher:      fetcher,
-		evictionTimeout: evictionTimeout,
-		lastEvictionNs:  uint64(monotime.Now()),
-		evictionCond:    sync.NewCond(&sync.Mutex{}),
+		mapFetcher:               fetcher,
+		evictionTimeout:          evictionTimeout,
+		lastEvictionNs:           uint64(monotime.Now()),
+		evictionCond:             sync.NewCond(&sync.Mutex{}),
+		staleEntriesEvictTimeout: staleEntriesEvictTimeout,
 	}
 }
 
@@ -109,6 +112,7 @@ func (m *MapTracer) evictFlows(ctx context.Context, enableGC bool, forwardFlows
 			uint64(monotonicTimeNow),
 		))
 	}
+	m.mapFetcher.DeleteMapsStaleEntries(m.staleEntriesEvictTimeout)
 	m.lastEvictionNs = laterFlowNs
 	select {
 	case <-ctx.Done():
diff --git a/pkg/test/tracer_fake.go b/pkg/test/tracer_fake.go
index 960de0c6..522c6332 100644
--- a/pkg/test/tracer_fake.go
+++ b/pkg/test/tracer_fake.go
@@ -3,6 +3,7 @@ package test
 import (
 	"bytes"
 	"encoding/binary"
+	"time"
 
 	"github.com/cilium/ebpf/ringbuf"
 	"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
@@ -42,6 +43,9 @@ func (m *TracerFake) LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetric
 	}
 }
 
+func (m *TracerFake) DeleteMapsStaleEntries(_ time.Duration) {
+}
+
 func (m *TracerFake) ReadRingBuf() (ringbuf.Record, error) {
 	return <-m.ringBuf, nil
 }
-- 
GitLab