Skip to content
Snippets Groups Projects
Unverified Commit 45926b97 authored by Joel Takvorian's avatar Joel Takvorian Committed by GitHub
Browse files

NETOBSERV-559: use LookupAndDelete to read maps (#283)

Keep legacy code for old kernels

Do not base support detection on kernel version

Instead, just try and fallback to legacy when relevant
parent 58d01d93
No related branches found
No related tags found
No related merge requests found
...@@ -53,17 +53,18 @@ var plog = logrus.WithField("component", "ebpf.PacketFetcher") ...@@ -53,17 +53,18 @@ var plog = logrus.WithField("component", "ebpf.PacketFetcher")
// and to flows that are forwarded by the kernel via ringbuffer because could not be aggregated // and to flows that are forwarded by the kernel via ringbuffer because could not be aggregated
// in the map // in the map
type FlowFetcher struct { type FlowFetcher struct {
objects *BpfObjects objects *BpfObjects
qdiscs map[ifaces.Interface]*netlink.GenericQdisc qdiscs map[ifaces.Interface]*netlink.GenericQdisc
egressFilters map[ifaces.Interface]*netlink.BpfFilter egressFilters map[ifaces.Interface]*netlink.BpfFilter
ingressFilters map[ifaces.Interface]*netlink.BpfFilter ingressFilters map[ifaces.Interface]*netlink.BpfFilter
ringbufReader *ringbuf.Reader ringbufReader *ringbuf.Reader
cacheMaxSize int cacheMaxSize int
enableIngress bool enableIngress bool
enableEgress bool enableEgress bool
pktDropsTracePoint link.Link pktDropsTracePoint link.Link
rttFentryLink link.Link rttFentryLink link.Link
rttKprobeLink link.Link rttKprobeLink link.Link
lookupAndDeleteSupported bool
} }
type FlowFetcherConfig struct { type FlowFetcherConfig struct {
...@@ -119,7 +120,10 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) { ...@@ -119,7 +120,10 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
return nil, fmt.Errorf("rewriting BPF constants definition: %w", err) return nil, fmt.Errorf("rewriting BPF constants definition: %w", err)
} }
oldKernel := utils.IskernelOlderthan514() oldKernel := utils.IsKernelOlderThan("5.14.0")
if oldKernel {
log.Infof("kernel older than 5.14.0 detected: not all hooks are supported")
}
objects, err := kernelSpecificLoadAndAssign(oldKernel, spec) objects, err := kernelSpecificLoadAndAssign(oldKernel, spec)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -165,17 +169,18 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) { ...@@ -165,17 +169,18 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
} }
return &FlowFetcher{ return &FlowFetcher{
objects: &objects, objects: &objects,
ringbufReader: flows, ringbufReader: flows,
egressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, egressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{}, qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{},
cacheMaxSize: cfg.CacheMaxSize, cacheMaxSize: cfg.CacheMaxSize,
enableIngress: cfg.EnableIngress, enableIngress: cfg.EnableIngress,
enableEgress: cfg.EnableEgress, enableEgress: cfg.EnableEgress,
pktDropsTracePoint: pktDropsLink, pktDropsTracePoint: pktDropsLink,
rttFentryLink: rttFentryLink, rttFentryLink: rttFentryLink,
rttKprobeLink: rttKprobeLink, rttKprobeLink: rttKprobeLink,
lookupAndDeleteSupported: true, // this will be turned off later if found to be not supported
}, nil }, nil
} }
...@@ -404,35 +409,41 @@ func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) { ...@@ -404,35 +409,41 @@ func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) {
} }
// LookupAndDeleteMap reads all the entries from the eBPF map and removes them from it. // LookupAndDeleteMap reads all the entries from the eBPF map and removes them from it.
// It returns a map where the key
// For synchronization purposes, we get/delete a whole snapshot of the flows map.
// This way we avoid missing packets that could be updated on the
// ebpf side while we process/aggregate them here
// Changing this method invocation by BatchLookupAndDelete could improve performance
// TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively // TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively
// Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md // Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md
// Race conditions here causes that some flows are lost in high-load scenarios
func (m *FlowFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[BpfFlowId][]BpfFlowMetrics { func (m *FlowFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[BpfFlowId][]BpfFlowMetrics {
if !m.lookupAndDeleteSupported {
return m.legacyLookupAndDeleteMap(met)
}
flowMap := m.objects.AggregatedFlows flowMap := m.objects.AggregatedFlows
iterator := flowMap.Iterate() iterator := flowMap.Iterate()
var flows = make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize) var flows = make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize)
var ids []BpfFlowId
var id BpfFlowId var id BpfFlowId
var metrics []BpfFlowMetrics var metrics []BpfFlowMetrics
count := 0 // First, get all ids and don't care about metrics (we need lookup+delete to be atomic)
// Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions
// TODO: detect whether LookupAndDelete is supported (Kernel>=4.20) and use it selectively
for iterator.Next(&id, &metrics) { for iterator.Next(&id, &metrics) {
ids = append(ids, id)
}
count := 0
// Run the atomic Lookup+Delete; if new ids have been inserted in the meantime, they'll be fetched next time
for i, id := range ids {
count++ count++
if err := flowMap.Delete(id); err != nil { if err := flowMap.LookupAndDelete(&id, &metrics); err != nil {
if i == 0 && errors.Is(err, ebpf.ErrNotSupported) {
log.WithError(err).Warnf("switching to legacy mode")
m.lookupAndDeleteSupported = false
return m.legacyLookupAndDeleteMap(met)
}
log.WithError(err).WithField("flowId", id).Warnf("couldn't delete flow entry") log.WithError(err).WithField("flowId", id).Warnf("couldn't delete flow entry")
met.Errors.WithErrorName("flow-fetcher", "CannotDeleteFlows").Inc() met.Errors.WithErrorName("flow-fetcher", "CannotDeleteFlows").Inc()
continue
} }
// We observed that eBFP PerCPU map might insert multiple times the same key in the map flows[id] = metrics
// (probably due to race conditions) so we need to re-join metrics again at userspace
// TODO: instrument how many times the keys are is repeated in the same eviction
flows[id] = append(flows[id], metrics...)
} }
met.BufferSizeGauge.WithBufferName("hashmap-total").Set(float64(count)) met.BufferSizeGauge.WithBufferName("hashmap-total").Set(float64(count))
met.BufferSizeGauge.WithBufferName("hashmap-unique").Set(float64(len(flows))) met.BufferSizeGauge.WithBufferName("hashmap-unique").Set(float64(len(flows)))
...@@ -451,16 +462,21 @@ func (m *FlowFetcher) lookupAndDeleteDNSMap(timeOut time.Duration) { ...@@ -451,16 +462,21 @@ func (m *FlowFetcher) lookupAndDeleteDNSMap(timeOut time.Duration) {
monotonicTimeNow := monotime.Now() monotonicTimeNow := monotime.Now()
dnsMap := m.objects.DnsFlows dnsMap := m.objects.DnsFlows
var dnsKey BpfDnsFlowId var dnsKey BpfDnsFlowId
var keysToDelete []BpfDnsFlowId
var dnsVal uint64 var dnsVal uint64
if dnsMap != nil { if dnsMap != nil {
// Ideally the Lookup + Delete should be atomic, however we cannot use LookupAndDelete since the deletion is conditional
// Do not delete while iterating, as it causes severe performance degradation
iterator := dnsMap.Iterate() iterator := dnsMap.Iterate()
for iterator.Next(&dnsKey, &dnsVal) { for iterator.Next(&dnsKey, &dnsVal) {
if time.Duration(uint64(monotonicTimeNow)-dnsVal) >= timeOut { if time.Duration(uint64(monotonicTimeNow)-dnsVal) >= timeOut {
if err := dnsMap.Delete(dnsKey); err != nil { keysToDelete = append(keysToDelete, dnsKey)
log.WithError(err).WithField("dnsKey", dnsKey). }
Warnf("couldn't delete DNS record entry") }
} for _, dnsKey = range keysToDelete {
if err := dnsMap.Delete(dnsKey); err != nil {
log.WithError(err).WithField("dnsKey", dnsKey).Warnf("couldn't delete DNS record entry")
} }
} }
} }
...@@ -529,14 +545,15 @@ func kernelSpecificLoadAndAssign(oldKernel bool, spec *ebpf.CollectionSpec) (Bpf ...@@ -529,14 +545,15 @@ func kernelSpecificLoadAndAssign(oldKernel bool, spec *ebpf.CollectionSpec) (Bpf
// It provides access to packets from the kernel space (via PerfCPU hashmap) // It provides access to packets from the kernel space (via PerfCPU hashmap)
type PacketFetcher struct { type PacketFetcher struct {
objects *BpfObjects objects *BpfObjects
qdiscs map[ifaces.Interface]*netlink.GenericQdisc qdiscs map[ifaces.Interface]*netlink.GenericQdisc
egressFilters map[ifaces.Interface]*netlink.BpfFilter egressFilters map[ifaces.Interface]*netlink.BpfFilter
ingressFilters map[ifaces.Interface]*netlink.BpfFilter ingressFilters map[ifaces.Interface]*netlink.BpfFilter
perfReader *perf.Reader perfReader *perf.Reader
cacheMaxSize int cacheMaxSize int
enableIngress bool enableIngress bool
enableEgress bool enableEgress bool
lookupAndDeleteSupported bool
} }
func NewPacketFetcher( func NewPacketFetcher(
...@@ -605,14 +622,15 @@ func NewPacketFetcher( ...@@ -605,14 +622,15 @@ func NewPacketFetcher(
} }
return &PacketFetcher{ return &PacketFetcher{
objects: &objects, objects: &objects,
perfReader: packets, perfReader: packets,
egressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, egressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{}, qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{},
cacheMaxSize: cacheMaxSize, cacheMaxSize: cacheMaxSize,
enableIngress: ingress, enableIngress: ingress,
enableEgress: egress, enableEgress: egress,
lookupAndDeleteSupported: true, // this will be turned off later if found to be not supported
}, nil }, nil
} }
...@@ -797,19 +815,35 @@ func (p *PacketFetcher) ReadPerf() (perf.Record, error) { ...@@ -797,19 +815,35 @@ func (p *PacketFetcher) ReadPerf() (perf.Record, error) {
} }
func (p *PacketFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[int][]*byte { func (p *PacketFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[int][]*byte {
if !p.lookupAndDeleteSupported {
return p.legacyLookupAndDeleteMap(met)
}
packetMap := p.objects.PacketRecord packetMap := p.objects.PacketRecord
iterator := packetMap.Iterate() iterator := packetMap.Iterate()
packets := make(map[int][]*byte, p.cacheMaxSize) packets := make(map[int][]*byte, p.cacheMaxSize)
var id int var id int
var ids []int
var packet []*byte var packet []*byte
// First, get all ids and ignore content (we need lookup+delete to be atomic)
for iterator.Next(&id, &packet) { for iterator.Next(&id, &packet) {
if err := packetMap.Delete(id); err != nil { ids = append(ids, id)
log.WithError(err).WithField("packetID ", id). }
Warnf("couldn't delete entry")
met.Errors.WithErrorName("pkt-fetcher", "CannotDeleteFlows").Inc() // Run the atomic Lookup+Delete; if new ids have been inserted in the meantime, they'll be fetched next time
for i, id := range ids {
if err := packetMap.LookupAndDelete(&id, &packet); err != nil {
if i == 0 && errors.Is(err, ebpf.ErrNotSupported) {
log.WithError(err).Warnf("switching to legacy mode")
p.lookupAndDeleteSupported = false
return p.legacyLookupAndDeleteMap(met)
}
log.WithError(err).WithField("packetID", id).Warnf("couldn't delete entry")
met.Errors.WithErrorName("pkt-fetcher", "CannotDeleteEntry").Inc()
} }
packets[id] = append(packets[id], packet...) packets[id] = packet
} }
return packets return packets
} }
package ebpf
import "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics"
// This file contains legacy implementations kept for old kernels
func (m *FlowFetcher) legacyLookupAndDeleteMap(met *metrics.Metrics) map[BpfFlowId][]BpfFlowMetrics {
flowMap := m.objects.AggregatedFlows
iterator := flowMap.Iterate()
var flows = make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize)
var id BpfFlowId
var metrics []BpfFlowMetrics
count := 0
// Deleting while iterating is really bad for performance (like, really!) as it causes seeing multiple times the same key
// This is solved in >=4.20 kernels with LookupAndDelete
for iterator.Next(&id, &metrics) {
count++
if err := flowMap.Delete(id); err != nil {
log.WithError(err).WithField("flowId", id).Warnf("couldn't delete flow entry")
met.Errors.WithErrorName("flow-fetcher-legacy", "CannotDeleteFlows").Inc()
}
// We observed that eBFP PerCPU map might insert multiple times the same key in the map
// (probably due to race conditions) so we need to re-join metrics again at userspace
flows[id] = append(flows[id], metrics...)
}
met.BufferSizeGauge.WithBufferName("hashmap-legacy-total").Set(float64(count))
met.BufferSizeGauge.WithBufferName("hashmap-legacy-unique").Set(float64(len(flows)))
return flows
}
func (p *PacketFetcher) legacyLookupAndDeleteMap(met *metrics.Metrics) map[int][]*byte {
packetMap := p.objects.PacketRecord
iterator := packetMap.Iterate()
packets := make(map[int][]*byte, p.cacheMaxSize)
var id int
var packet []*byte
for iterator.Next(&id, &packet) {
if err := packetMap.Delete(id); err != nil {
log.WithError(err).WithField("packetID ", id).Warnf("couldn't delete entry")
met.Errors.WithErrorName("pkt-fetcher-legacy", "CannotDeleteEntry").Inc()
}
packets[id] = append(packets[id], packet...)
}
return packets
}
...@@ -12,10 +12,18 @@ import ( ...@@ -12,10 +12,18 @@ import (
) )
var ( var (
getCurrentKernelVersion = currentKernelVersion kernelVersion uint32
log = logrus.WithField("component", "utils") log = logrus.WithField("component", "utils")
) )
func init() {
var err error
kernelVersion, err = currentKernelVersion()
if err != nil {
log.Errorf("failed to get current kernel version: %v", err)
}
}
// GetSocket returns socket string in the correct format based on address family // GetSocket returns socket string in the correct format based on address family
func GetSocket(hostIP string, hostPort int) string { func GetSocket(hostIP string, hostPort int) string {
socket := fmt.Sprintf("%s:%d", hostIP, hostPort) socket := fmt.Sprintf("%s:%d", hostIP, hostPort)
...@@ -26,22 +34,13 @@ func GetSocket(hostIP string, hostPort int) string { ...@@ -26,22 +34,13 @@ func GetSocket(hostIP string, hostPort int) string {
return socket return socket
} }
func IskernelOlderthan514() bool { func IsKernelOlderThan(version string) bool {
kernelVersion514, err := kernelVersionFromReleaseString("5.14.0") refVersion, err := kernelVersionFromReleaseString(version)
if err != nil { if err != nil {
log.Warnf("failed to get kernel version from release string: %v", err) log.Warnf("failed to get kernel version from release string: %v", err)
return false return false
} }
currentVersion, err := getCurrentKernelVersion() return kernelVersion != 0 && kernelVersion < refVersion
if err != nil {
log.Warnf("failed to get current kernel version: %v", err)
return false
}
if currentVersion < kernelVersion514 {
log.Infof("older kernel version not all hooks will be supported")
return true
}
return false
} }
var versionRegex = regexp.MustCompile(`^(\d+)\.(\d+).(\d+).*$`) var versionRegex = regexp.MustCompile(`^(\d+)\.(\d+).(\d+).*$`)
......
package utils package utils
import ( import (
"errors"
"testing" "testing"
"github.com/stretchr/testify/require"
) )
func TestIskernelOlderthan514(t *testing.T) { func TestIskernelOlderthan514(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
mockKernelVersion func() (uint32, error) kernelVersion string
want bool want bool
wantError bool
}{ }{
{ {
name: "Kernel version < 5.14.0", name: "Kernel version < 5.14.0",
mockKernelVersion: func() (uint32, error) { kernelVersion: "5.13.0",
ver, _ := kernelVersionFromReleaseString("5.13.0") want: true,
return ver, nil
},
want: true,
}, },
{ {
name: "Kernel version = 5.14.0", name: "Kernel version = 5.14.0",
mockKernelVersion: func() (uint32, error) { kernelVersion: "5.14.0",
ver, _ := kernelVersionFromReleaseString("5.14.0") want: false,
return ver, nil
},
want: false,
}, },
{ {
name: "Kernel version > 5.14.0", name: "Kernel version > 5.14.0",
mockKernelVersion: func() (uint32, error) { kernelVersion: "5.15.0",
ver, _ := kernelVersionFromReleaseString("5.15.0") want: false,
return ver, nil
},
want: false,
}, },
{ {
name: "Error getting kernel version", name: "Error getting kernel version",
mockKernelVersion: func() (uint32, error) { kernelVersion: "invalid version",
return 0, errors.New("error") want: false,
}, wantError: true,
want: false,
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
getCurrentKernelVersion = tt.mockKernelVersion versionUint, err := kernelVersionFromReleaseString(tt.kernelVersion)
got := IskernelOlderthan514() if tt.wantError {
require.Errorf(t, err, "%s: expecting error, got none", tt.name)
} else {
require.NoErrorf(t, err, "%s: expecting no error, got %v", tt.name, err)
}
kernelVersion = versionUint
got := IsKernelOlderThan("5.14.0")
if got != tt.want { if got != tt.want {
t.Errorf("%s: IskernelOlderthan514() = %v, want %v", tt.name, got, tt.want) t.Errorf("%s: IskernelOlderthan514() = %v, want %v", tt.name, got, tt.want)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment