Skip to content
Snippets Groups Projects
Unverified Commit 338d2b22 authored by Mohamed S. Mahmoud's avatar Mohamed S. Mahmoud Committed by GitHub
Browse files

Evict DNS and RTT stale entries everytime we evict flows table (#163)


Signed-off-by: default avatarmsherif1234 <mmahmoud@redhat.com>
parent 3955ce8d
No related branches found
No related tags found
No related merge requests found
...@@ -79,6 +79,7 @@ type ebpfFlowFetcher interface { ...@@ -79,6 +79,7 @@ type ebpfFlowFetcher interface {
Register(iface ifaces.Interface) error Register(iface ifaces.Interface) error
LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics
DeleteMapsStaleEntries(timeOut time.Duration)
ReadRingBuf() (ringbuf.Record, error) ReadRingBuf() (ringbuf.Record, error)
} }
...@@ -163,7 +164,7 @@ func flowsAgent(cfg *Config, ...@@ -163,7 +164,7 @@ func flowsAgent(cfg *Config,
return iface return iface
} }
mapTracer := flow.NewMapTracer(fetcher, cfg.CacheActiveTimeout) mapTracer := flow.NewMapTracer(fetcher, cfg.CacheActiveTimeout, cfg.StaleEntriesEvictTimeout)
rbTracer := flow.NewRingBufTracer(fetcher, mapTracer, cfg.CacheActiveTimeout) rbTracer := flow.NewRingBufTracer(fetcher, mapTracer, cfg.CacheActiveTimeout)
accounter := flow.NewAccounter( accounter := flow.NewAccounter(
cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now) cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now)
......
...@@ -146,4 +146,7 @@ type Config struct { ...@@ -146,4 +146,7 @@ type Config struct {
EnablePktDrops bool `env:"ENABLE_PKT_DROPS" envDefault:"false"` EnablePktDrops bool `env:"ENABLE_PKT_DROPS" envDefault:"false"`
// EnableDNSTracking enable DNS tracking eBPF hook to track dns query/response flows // EnableDNSTracking enable DNS tracking eBPF hook to track dns query/response flows
EnableDNSTracking bool `env:"ENABLE_DNS_TRACKING" envDefault:"false"` EnableDNSTracking bool `env:"ENABLE_DNS_TRACKING" envDefault:"false"`
// StaleEntriesEvictTimeout specifies the maximum duration that stale entries are kept
// before being deleted, default is 5 seconds.
StaleEntriesEvictTimeout time.Duration `env:"STALE_ENTRIES_EVICT_TIMEOUT" envDefault:"5s"`
} }
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"io/fs" "io/fs"
"strings" "strings"
"time"
"github.com/netobserv/netobserv-ebpf-agent/pkg/ifaces" "github.com/netobserv/netobserv-ebpf-agent/pkg/ifaces"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils" "github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
...@@ -14,6 +15,7 @@ import ( ...@@ -14,6 +15,7 @@ import (
"github.com/cilium/ebpf/link" "github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/ringbuf" "github.com/cilium/ebpf/ringbuf"
"github.com/cilium/ebpf/rlimit" "github.com/cilium/ebpf/rlimit"
"github.com/gavv/monotime"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vishvananda/netlink" "github.com/vishvananda/netlink"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
...@@ -432,5 +434,50 @@ func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId]*BpfFlowMetrics { ...@@ -432,5 +434,50 @@ func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId]*BpfFlowMetrics {
*metricPtr = metric *metricPtr = metric
flow[id] = metricPtr flow[id] = metricPtr
} }
return flow return flow
} }
// DeleteMapsStaleEntries Look for any stale entries in the features maps and delete them
func (m *FlowFetcher) DeleteMapsStaleEntries(timeOut time.Duration) {
m.lookupAndDeleteDNSMap(timeOut)
m.lookupAndDeleteRTTMap(timeOut)
}
// lookupAndDeleteDNSMap iterate over DNS queries map and delete any stale DNS requests
// entries which never get responses for.
func (m *FlowFetcher) lookupAndDeleteDNSMap(timeOut time.Duration) {
monotonicTimeNow := monotime.Now()
dnsMap := m.objects.DnsFlows
var dnsKey BpfDnsFlowId
var dnsVal uint64
iterator := dnsMap.Iterate()
for iterator.Next(&dnsKey, &dnsVal) {
if time.Duration(uint64(monotonicTimeNow)-dnsVal) >= timeOut {
if err := dnsMap.Delete(dnsKey); err != nil {
log.WithError(err).WithField("dnsKey", dnsKey).
Warnf("couldn't delete DNS record entry")
}
}
}
}
// lookupAndDeleteRTTMap iterate over flows sequence map and delete any
// stale flows that we never get responses for.
func (m *FlowFetcher) lookupAndDeleteRTTMap(timeOut time.Duration) {
monotonicTimeNow := monotime.Now()
rttMap := m.objects.FlowSequences
var rttKey BpfFlowSeqId
var rttVal uint64
iterator := rttMap.Iterate()
for iterator.Next(&rttKey, &rttVal) {
if time.Duration(uint64(monotonicTimeNow)-rttVal) >= timeOut {
if err := rttMap.Delete(rttKey); err != nil {
log.WithError(err).WithField("rttKey", rttKey).
Warnf("couldn't delete RTT record entry")
}
}
}
}
...@@ -20,6 +20,7 @@ var mtlog = logrus.WithField("component", "flow.MapTracer") ...@@ -20,6 +20,7 @@ var mtlog = logrus.WithField("component", "flow.MapTracer")
type MapTracer struct { type MapTracer struct {
mapFetcher mapFetcher mapFetcher mapFetcher
evictionTimeout time.Duration evictionTimeout time.Duration
staleEntriesEvictTimeout time.Duration
// manages the access to the eviction routines, avoiding two evictions happening at the same time // manages the access to the eviction routines, avoiding two evictions happening at the same time
evictionCond *sync.Cond evictionCond *sync.Cond
lastEvictionNs uint64 lastEvictionNs uint64
...@@ -27,14 +28,16 @@ type MapTracer struct { ...@@ -27,14 +28,16 @@ type MapTracer struct {
type mapFetcher interface { type mapFetcher interface {
LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics
DeleteMapsStaleEntries(timeOut time.Duration)
} }
func NewMapTracer(fetcher mapFetcher, evictionTimeout time.Duration) *MapTracer { func NewMapTracer(fetcher mapFetcher, evictionTimeout, staleEntriesEvictTimeout time.Duration) *MapTracer {
return &MapTracer{ return &MapTracer{
mapFetcher: fetcher, mapFetcher: fetcher,
evictionTimeout: evictionTimeout, evictionTimeout: evictionTimeout,
lastEvictionNs: uint64(monotime.Now()), lastEvictionNs: uint64(monotime.Now()),
evictionCond: sync.NewCond(&sync.Mutex{}), evictionCond: sync.NewCond(&sync.Mutex{}),
staleEntriesEvictTimeout: staleEntriesEvictTimeout,
} }
} }
...@@ -109,6 +112,7 @@ func (m *MapTracer) evictFlows(ctx context.Context, enableGC bool, forwardFlows ...@@ -109,6 +112,7 @@ func (m *MapTracer) evictFlows(ctx context.Context, enableGC bool, forwardFlows
uint64(monotonicTimeNow), uint64(monotonicTimeNow),
)) ))
} }
m.mapFetcher.DeleteMapsStaleEntries(m.staleEntriesEvictTimeout)
m.lastEvictionNs = laterFlowNs m.lastEvictionNs = laterFlowNs
select { select {
case <-ctx.Done(): case <-ctx.Done():
......
...@@ -3,6 +3,7 @@ package test ...@@ -3,6 +3,7 @@ package test
import ( import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"time"
"github.com/cilium/ebpf/ringbuf" "github.com/cilium/ebpf/ringbuf"
"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf" "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
...@@ -42,6 +43,9 @@ func (m *TracerFake) LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetric ...@@ -42,6 +43,9 @@ func (m *TracerFake) LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetric
} }
} }
func (m *TracerFake) DeleteMapsStaleEntries(_ time.Duration) {
}
func (m *TracerFake) ReadRingBuf() (ringbuf.Record, error) { func (m *TracerFake) ReadRingBuf() (ringbuf.Record, error) {
return <-m.ringBuf, nil return <-m.ringBuf, nil
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment