diff --git a/bpf/flows.c b/bpf/flows.c index 15e1fba12b5e8c7c1d1f23c8f851c08f89fbefc4..3b7016f927adb5dc0bcdc829f97048d1e11ad883 100644 --- a/bpf/flows.c +++ b/bpf/flows.c @@ -23,6 +23,9 @@ struct { __uint(max_entries, 1 << 24); } flows SEC(".maps"); +// Constant definitions, to be overridden by the invoker +volatile const u32 sampling = 0; + // sets flow fields from IPv4 header information static inline int fill_iphdr(struct iphdr *ip, void *data_end, struct flow *flow) { if ((void *)ip + sizeof(*ip) > data_end) { @@ -72,6 +75,12 @@ static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, struct flow *f // parses flow information for a given direction (ingress/egress) static inline int flow_parse(struct __sk_buff *skb, u8 direction) { + + // If sampling is defined, will only parse 1 out of "sampling" flows + if (sampling != 0 && (bpf_get_prandom_u32() % sampling) != 0) { + return TC_ACT_OK; + } + void *data = (void *)(long)skb->data; void *data_end = (void *)(long)skb->data_end; diff --git a/cmd/netobserv-ebpf-agent.go b/cmd/netobserv-ebpf-agent.go index 49618bbe9d94a094cabe09bdf96cd03834aab576..310eda2e5f969d3399afad26b37b0c380335c633 100644 --- a/cmd/netobserv-ebpf-agent.go +++ b/cmd/netobserv-ebpf-agent.go @@ -21,9 +21,7 @@ func main() { if err := env.Parse(&config); err != nil { logrus.WithError(err).Fatal("can't load configuration from environment") } - if config.Verbose { - logrus.SetLevel(logrus.DebugLevel) - } + setLoggerVerbosity(&config) logrus.WithField("configuration", fmt.Sprintf("%#v", config)).Debugf("configuration loaded") @@ -45,3 +43,12 @@ func main() { logrus.WithError(err).Fatal("can't start netobserv-ebpf-agent") } } + +func setLoggerVerbosity(cfg *agent.Config) { + lvl, err := logrus.ParseLevel(cfg.LogLevel) + if err != nil { + logrus.WithError(err).Warn("assuming 'info' logger level as default") + lvl = logrus.InfoLevel + } + logrus.SetLevel(lvl) +} diff --git a/go.mod b/go.mod index 8015af8fd79b04adf8003007c7733e643a4d4514..5a7c845110385562638ff74a213ba385d273c529 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.7.1 github.com/vishvananda/netlink v1.1.0 + golang.org/x/sys v0.0.0-20220412211240-33da011f77ad google.golang.org/grpc v1.45.0 google.golang.org/protobuf v1.28.0 ) @@ -22,7 +23,6 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df // indirect golang.org/x/net v0.0.0-20220412020605-290c469a71a5 // indirect - golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac // indirect diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index d27598c6dd1d4a00f754b12b54c553495ae63178..45869c0995d6812e86dd14a925b76c417eb4f0f1 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -42,7 +42,7 @@ func FlowsAgent(cfg *Config) (*Flows, error) { } tracers := map[string]flowTracer{} for iface := range interfaces { - tracers[iface] = ebpf.NewFlowTracer(iface) + tracers[iface] = ebpf.NewFlowTracer(iface, cfg.Sampling) } target := fmt.Sprintf("%s:%d", cfg.TargetHost, cfg.TargetPort) grpcExporter, err := exporter.StartGRPCProto(target) diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go index 368b8c94186338b3ebf0f1b6b3adba1dbe0c2861..2a90b473901898823aefd9835ab032efbbda62f0 100644 --- a/pkg/agent/agent_test.go +++ b/pkg/agent/agent_test.go @@ -28,7 +28,6 @@ func TestFlowsAgent(t *testing.T) { flowsAgent, err := FlowsAgent(&Config{ TargetHost: "127.0.0.1", TargetPort: port, - Verbose: true, CacheMaxFlows: 1, CacheActiveTimeout: 5 * time.Second, BuffersLength: 10, diff --git a/pkg/agent/config.go b/pkg/agent/config.go index 3f51807c5ad5180fe39ebab0d356c70fd8a9babf..7bb25c93d5fcaf932bc6197c3de1b6424e2880f3 100644 --- a/pkg/agent/config.go +++ b/pkg/agent/config.go @@ -32,8 +32,11 @@ type Config struct { // CacheActiveTimeout specifies the maximum duration in which a flow is kept in the accounting // cache before being flushed for its later export CacheActiveTimeout time.Duration `env:"CACHE_ACTIVE_TIMEOUT" envDefault:"5s"` - // Verbose logs mode - Verbose bool `env:"VERBOSE" envDefault:"false"` + // Logger level. From more to less verbose: trace, debug, info, warn, error, fatal, panic. + LogLevel string `env:"LOG_LEVEL" envDefault:"info"` + // Sampling holds the rate at which packets should be sampled and sent to the target collector. + // E.g. if set to 100, one out of 100 packets, on average, will be sent to each target collector. + Sampling uint32 `env:"SAMPLING" envDefault:"0"` } func getInterfaces(cfg *Config, interfaces func() ([]net.Interface, error)) (map[string]struct{}, error) { diff --git a/pkg/ebpf/bpf_bpfeb.o b/pkg/ebpf/bpf_bpfeb.o index 208ca53bf2294b8a1b30f720307c32dcf618b0bb..8219d6f33c60b495fcff888e892948a113765a7b 100644 Binary files a/pkg/ebpf/bpf_bpfeb.o and b/pkg/ebpf/bpf_bpfeb.o differ diff --git a/pkg/ebpf/bpf_bpfel.o b/pkg/ebpf/bpf_bpfel.o index 81252eea91e8758f00fa67fa5bed397f718cd129..034b40a641efeae375398cfecd0c53277554a14c 100644 Binary files a/pkg/ebpf/bpf_bpfel.o and b/pkg/ebpf/bpf_bpfel.o differ diff --git a/pkg/ebpf/tracer.go b/pkg/ebpf/tracer.go index 5c962babe5de4d2918bdd9d98e4b9ebc6cfef831..6de1983892eb5ec0ce0454ebd3015b7faf7e8151 100644 --- a/pkg/ebpf/tracer.go +++ b/pkg/ebpf/tracer.go @@ -15,6 +15,7 @@ import ( "github.com/netobserv/netobserv-ebpf-agent/pkg/flow" "github.com/sirupsen/logrus" "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" ) // $BPF_CLANG and $BPF_CFLAGS are set by the Makefile. @@ -22,6 +23,8 @@ import ( const ( qdiscType = "clsact" + // constants defined in flows.c as "volatile const" + constSampling = "sampling" ) var log = logrus.WithField("component", "ebpf.FlowTracer") @@ -29,6 +32,7 @@ var log = logrus.WithField("component", "ebpf.FlowTracer") // FlowTracer reads and forwards the Flows from the Transmission Control, for a given interface. type FlowTracer struct { interfaceName string + sampling uint32 objects bpfObjects qdisc *netlink.GenericQdisc egressFilter *netlink.BpfFilter @@ -37,10 +41,11 @@ type FlowTracer struct { } // NewFlowTracer fo a given interface type -func NewFlowTracer(iface string) *FlowTracer { +func NewFlowTracer(iface string, sampling uint32) *FlowTracer { log.WithField("iface", iface).Debug("Instantiating flow tracer") return &FlowTracer{ interfaceName: iface, + sampling: sampling, } } @@ -53,9 +58,18 @@ func (m *FlowTracer) Register() error { if err := rlimit.RemoveMemlock(); err != nil { return fmt.Errorf("removing mem lock: %w", err) } - // Load pre-compiled programs and maps into the kernel. - if err := loadBpfObjects(&m.objects, nil); err != nil { - return fmt.Errorf("loading objects: %w", err) + // Load pre-compiled programs and maps into the kernel, and rewrites the configuration + spec, err := loadBpf() + if err != nil { + return fmt.Errorf("loading BPF data: %w", err) + } + if err := spec.RewriteConstants(map[string]interface{}{ + constSampling: m.sampling, + }); err != nil { + return fmt.Errorf("rewriting BPF constants definition: %w", err) + } + if err := spec.LoadAndAssign(&m.objects, nil); err != nil { + return fmt.Errorf("loading and assigning BPF objects: %w", err) } ipvlan, err := netlink.LinkByName(m.interfaceName) if err != nil { @@ -110,7 +124,7 @@ func (m *FlowTracer) Register() error { LinkIndex: ipvlan.Attrs().Index, Parent: netlink.HANDLE_MIN_INGRESS, Handle: netlink.MakeHandle(0, 1), - Protocol: 3, + Protocol: unix.ETH_P_ALL, Priority: 1, } m.ingressFilter = &netlink.BpfFilter{ diff --git a/scripts/generators.Dockerfile b/scripts/generators.Dockerfile index eb33ce617254276c54e9bc8792be2d54a9b0c2c2..7fdcf34c73c6f387e4735bd1dadec718d00ae729 100644 --- a/scripts/generators.Dockerfile +++ b/scripts/generators.Dockerfile @@ -1,6 +1,6 @@ FROM fedora:35 -ARG GOVERSION="1.17.8" +ARG GOVERSION="1.17.9" ARG PROTOCVERSION="3.19.4" # Installs dependencies that are required to compile eBPF programs