Skip to content
Snippets Groups Projects
Unverified Commit 1d9a9b6d authored by Mario Macias's avatar Mario Macias Committed by GitHub
Browse files

NETOBSERV-201: configure LogLevel and Sampling (#16)

* NETOBSERV-201: configure LogLevel and Sampling

* updated go mod
parent e9b1d443
No related branches found
No related tags found
No related merge requests found
...@@ -23,6 +23,9 @@ struct { ...@@ -23,6 +23,9 @@ struct {
__uint(max_entries, 1 << 24); __uint(max_entries, 1 << 24);
} flows SEC(".maps"); } flows SEC(".maps");
// Constant definitions, to be overridden by the invoker
volatile const u32 sampling = 0;
// sets flow fields from IPv4 header information // sets flow fields from IPv4 header information
static inline int fill_iphdr(struct iphdr *ip, void *data_end, struct flow *flow) { static inline int fill_iphdr(struct iphdr *ip, void *data_end, struct flow *flow) {
if ((void *)ip + sizeof(*ip) > data_end) { if ((void *)ip + sizeof(*ip) > data_end) {
...@@ -72,6 +75,12 @@ static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, struct flow *f ...@@ -72,6 +75,12 @@ static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, struct flow *f
// parses flow information for a given direction (ingress/egress) // parses flow information for a given direction (ingress/egress)
static inline int flow_parse(struct __sk_buff *skb, u8 direction) { static inline int flow_parse(struct __sk_buff *skb, u8 direction) {
// If sampling is defined, will only parse 1 out of "sampling" flows
if (sampling != 0 && (bpf_get_prandom_u32() % sampling) != 0) {
return TC_ACT_OK;
}
void *data = (void *)(long)skb->data; void *data = (void *)(long)skb->data;
void *data_end = (void *)(long)skb->data_end; void *data_end = (void *)(long)skb->data_end;
......
...@@ -21,9 +21,7 @@ func main() { ...@@ -21,9 +21,7 @@ func main() {
if err := env.Parse(&config); err != nil { if err := env.Parse(&config); err != nil {
logrus.WithError(err).Fatal("can't load configuration from environment") logrus.WithError(err).Fatal("can't load configuration from environment")
} }
if config.Verbose { setLoggerVerbosity(&config)
logrus.SetLevel(logrus.DebugLevel)
}
logrus.WithField("configuration", fmt.Sprintf("%#v", config)).Debugf("configuration loaded") logrus.WithField("configuration", fmt.Sprintf("%#v", config)).Debugf("configuration loaded")
...@@ -45,3 +43,12 @@ func main() { ...@@ -45,3 +43,12 @@ func main() {
logrus.WithError(err).Fatal("can't start netobserv-ebpf-agent") logrus.WithError(err).Fatal("can't start netobserv-ebpf-agent")
} }
} }
func setLoggerVerbosity(cfg *agent.Config) {
lvl, err := logrus.ParseLevel(cfg.LogLevel)
if err != nil {
logrus.WithError(err).Warn("assuming 'info' logger level as default")
lvl = logrus.InfoLevel
}
logrus.SetLevel(lvl)
}
...@@ -11,6 +11,7 @@ require ( ...@@ -11,6 +11,7 @@ require (
github.com/sirupsen/logrus v1.8.1 github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.1 github.com/stretchr/testify v1.7.1
github.com/vishvananda/netlink v1.1.0 github.com/vishvananda/netlink v1.1.0
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad
google.golang.org/grpc v1.45.0 google.golang.org/grpc v1.45.0
google.golang.org/protobuf v1.28.0 google.golang.org/protobuf v1.28.0
) )
...@@ -22,7 +23,6 @@ require ( ...@@ -22,7 +23,6 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df // indirect github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df // indirect
golang.org/x/net v0.0.0-20220412020605-290c469a71a5 // indirect golang.org/x/net v0.0.0-20220412020605-290c469a71a5 // indirect
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect
golang.org/x/text v0.3.7 // indirect golang.org/x/text v0.3.7 // indirect
golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect
google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac // indirect google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac // indirect
......
...@@ -42,7 +42,7 @@ func FlowsAgent(cfg *Config) (*Flows, error) { ...@@ -42,7 +42,7 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
} }
tracers := map[string]flowTracer{} tracers := map[string]flowTracer{}
for iface := range interfaces { for iface := range interfaces {
tracers[iface] = ebpf.NewFlowTracer(iface) tracers[iface] = ebpf.NewFlowTracer(iface, cfg.Sampling)
} }
target := fmt.Sprintf("%s:%d", cfg.TargetHost, cfg.TargetPort) target := fmt.Sprintf("%s:%d", cfg.TargetHost, cfg.TargetPort)
grpcExporter, err := exporter.StartGRPCProto(target) grpcExporter, err := exporter.StartGRPCProto(target)
......
...@@ -28,7 +28,6 @@ func TestFlowsAgent(t *testing.T) { ...@@ -28,7 +28,6 @@ func TestFlowsAgent(t *testing.T) {
flowsAgent, err := FlowsAgent(&Config{ flowsAgent, err := FlowsAgent(&Config{
TargetHost: "127.0.0.1", TargetHost: "127.0.0.1",
TargetPort: port, TargetPort: port,
Verbose: true,
CacheMaxFlows: 1, CacheMaxFlows: 1,
CacheActiveTimeout: 5 * time.Second, CacheActiveTimeout: 5 * time.Second,
BuffersLength: 10, BuffersLength: 10,
......
...@@ -32,8 +32,11 @@ type Config struct { ...@@ -32,8 +32,11 @@ type Config struct {
// CacheActiveTimeout specifies the maximum duration in which a flow is kept in the accounting // CacheActiveTimeout specifies the maximum duration in which a flow is kept in the accounting
// cache before being flushed for its later export // cache before being flushed for its later export
CacheActiveTimeout time.Duration `env:"CACHE_ACTIVE_TIMEOUT" envDefault:"5s"` CacheActiveTimeout time.Duration `env:"CACHE_ACTIVE_TIMEOUT" envDefault:"5s"`
// Verbose logs mode // Logger level. From more to less verbose: trace, debug, info, warn, error, fatal, panic.
Verbose bool `env:"VERBOSE" envDefault:"false"` LogLevel string `env:"LOG_LEVEL" envDefault:"info"`
// Sampling holds the rate at which packets should be sampled and sent to the target collector.
// E.g. if set to 100, one out of 100 packets, on average, will be sent to each target collector.
Sampling uint32 `env:"SAMPLING" envDefault:"0"`
} }
func getInterfaces(cfg *Config, interfaces func() ([]net.Interface, error)) (map[string]struct{}, error) { func getInterfaces(cfg *Config, interfaces func() ([]net.Interface, error)) (map[string]struct{}, error) {
......
No preview for this file type
No preview for this file type
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow" "github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vishvananda/netlink" "github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
) )
// $BPF_CLANG and $BPF_CFLAGS are set by the Makefile. // $BPF_CLANG and $BPF_CFLAGS are set by the Makefile.
...@@ -22,6 +23,8 @@ import ( ...@@ -22,6 +23,8 @@ import (
const ( const (
qdiscType = "clsact" qdiscType = "clsact"
// constants defined in flows.c as "volatile const"
constSampling = "sampling"
) )
var log = logrus.WithField("component", "ebpf.FlowTracer") var log = logrus.WithField("component", "ebpf.FlowTracer")
...@@ -29,6 +32,7 @@ var log = logrus.WithField("component", "ebpf.FlowTracer") ...@@ -29,6 +32,7 @@ var log = logrus.WithField("component", "ebpf.FlowTracer")
// FlowTracer reads and forwards the Flows from the Transmission Control, for a given interface. // FlowTracer reads and forwards the Flows from the Transmission Control, for a given interface.
type FlowTracer struct { type FlowTracer struct {
interfaceName string interfaceName string
sampling uint32
objects bpfObjects objects bpfObjects
qdisc *netlink.GenericQdisc qdisc *netlink.GenericQdisc
egressFilter *netlink.BpfFilter egressFilter *netlink.BpfFilter
...@@ -37,10 +41,11 @@ type FlowTracer struct { ...@@ -37,10 +41,11 @@ type FlowTracer struct {
} }
// NewFlowTracer fo a given interface type // NewFlowTracer fo a given interface type
func NewFlowTracer(iface string) *FlowTracer { func NewFlowTracer(iface string, sampling uint32) *FlowTracer {
log.WithField("iface", iface).Debug("Instantiating flow tracer") log.WithField("iface", iface).Debug("Instantiating flow tracer")
return &FlowTracer{ return &FlowTracer{
interfaceName: iface, interfaceName: iface,
sampling: sampling,
} }
} }
...@@ -53,9 +58,18 @@ func (m *FlowTracer) Register() error { ...@@ -53,9 +58,18 @@ func (m *FlowTracer) Register() error {
if err := rlimit.RemoveMemlock(); err != nil { if err := rlimit.RemoveMemlock(); err != nil {
return fmt.Errorf("removing mem lock: %w", err) return fmt.Errorf("removing mem lock: %w", err)
} }
// Load pre-compiled programs and maps into the kernel. // Load pre-compiled programs and maps into the kernel, and rewrites the configuration
if err := loadBpfObjects(&m.objects, nil); err != nil { spec, err := loadBpf()
return fmt.Errorf("loading objects: %w", err) if err != nil {
return fmt.Errorf("loading BPF data: %w", err)
}
if err := spec.RewriteConstants(map[string]interface{}{
constSampling: m.sampling,
}); err != nil {
return fmt.Errorf("rewriting BPF constants definition: %w", err)
}
if err := spec.LoadAndAssign(&m.objects, nil); err != nil {
return fmt.Errorf("loading and assigning BPF objects: %w", err)
} }
ipvlan, err := netlink.LinkByName(m.interfaceName) ipvlan, err := netlink.LinkByName(m.interfaceName)
if err != nil { if err != nil {
...@@ -110,7 +124,7 @@ func (m *FlowTracer) Register() error { ...@@ -110,7 +124,7 @@ func (m *FlowTracer) Register() error {
LinkIndex: ipvlan.Attrs().Index, LinkIndex: ipvlan.Attrs().Index,
Parent: netlink.HANDLE_MIN_INGRESS, Parent: netlink.HANDLE_MIN_INGRESS,
Handle: netlink.MakeHandle(0, 1), Handle: netlink.MakeHandle(0, 1),
Protocol: 3, Protocol: unix.ETH_P_ALL,
Priority: 1, Priority: 1,
} }
m.ingressFilter = &netlink.BpfFilter{ m.ingressFilter = &netlink.BpfFilter{
......
FROM fedora:35 FROM fedora:35
ARG GOVERSION="1.17.8" ARG GOVERSION="1.17.9"
ARG PROTOCVERSION="3.19.4" ARG PROTOCVERSION="3.19.4"
# Installs dependencies that are required to compile eBPF programs # Installs dependencies that are required to compile eBPF programs
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment