Skip to content
Snippets Groups Projects
Unverified Commit c6e207d0 authored by Mario Macias's avatar Mario Macias Committed by GitHub
Browse files

Allow selecting flows' direction (#58)

* Allow selecting flows' direction

* Fix typo

* make linter happy
parent 24af5a52
No related branches found
No related tags found
No related merge requests found
......@@ -29,6 +29,8 @@ The following environment variables are available to configure the NetObserv eBF
deduplicator. After a flow hasn't been received for that expiry time, the deduplicator forgets it.
That means that a flow from a connection that has been inactive during that period could be
forwarded again from a different interface.
* `DIRECTION` (default: `both`). Allows selecting which flows to trace according to its direction.
Accepted values are `ingress`, `egress` or `both`.
* `LOG_LEVEL` (default: `info`). From more to less verbose: `trace`, `debug`, `info`, `warn`,
`error`, `fatal`, `panic`.
* `KAFKA_BROKERS` (required if `EXPORT` is `kafka`). Comma-separated list of tha addresses of the
......
......@@ -5,5 +5,5 @@ metadata:
labels:
strimzi.io/cluster: "kafka-cluster"
spec:
partitions: 1
partitions: 12
replicas: 1
\ No newline at end of file
......@@ -130,8 +130,11 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
return iface
}
ingress, egress := flowDirections(cfg)
tracer, err := ebpf.NewFlowTracer(
cfg.Sampling, cfg.CacheMaxFlows, cfg.BuffersLength, cfg.CacheActiveTimeout,
ingress, egress,
interfaceNamer,
)
if err != nil {
......@@ -147,6 +150,20 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
}, nil
}
func flowDirections(cfg *Config) (ingress, egress bool) {
switch cfg.Direction {
case DirectionIngress:
return true, false
case DirectionEgress:
return false, true
case DirectionBoth:
return true, true
default:
alog.Warnf("unknown DIRECTION %q. Tracing both ingress and egress traffic", cfg.Direction)
return true, true
}
}
// Run a Flows agent. The function will keep running in the same thread
// until the passed context is canceled
func (f *Flows) Run(ctx context.Context) error {
......
......@@ -9,6 +9,9 @@ const (
ListenWatch = "watch"
DeduperNone = "none"
DeduperFirstCome = "firstCome"
DirectionIngress = "ingress"
DirectionEgress = "egress"
DirectionBoth = "both"
)
type Config struct {
......@@ -49,6 +52,9 @@ type Config struct {
// again from a different interface.
// If the value is not set, it will default to 2 * CacheActiveTimeout
DeduperFCExpiry time.Duration `env:"DEDUPER_FC_EXPIRY"`
// Direction allows selecting which flows to trace according to its direction. Accepted values
// are "ingress", "egress" or "both" (default).
Direction string `env:"DIRECTION" envDefault:"both"`
// Logger level. From more to less verbose: trace, debug, info, warn, error, fatal, panic.
LogLevel string `env:"LOG_LEVEL" envDefault:"info"`
// Sampling holds the rate at which packets should be sampled and sent to the target collector.
......
......@@ -47,12 +47,15 @@ type FlowTracer struct {
flowsEvictor *sync.Cond
lastEvictionNs uint64
cacheMaxSize int
enableIngress bool
enableEgress bool
}
// TODO: decouple flowtracer logic from eBPF maps access so we can inject mocks for testing
func NewFlowTracer(
sampling, cacheMaxSize, buffersLength int,
evictionTimeout time.Duration,
ingress, egress bool,
namer flow.InterfaceNamer,
) (*FlowTracer, error) {
if err := rlimit.RemoveMemlock(); err != nil {
......@@ -96,6 +99,8 @@ func NewFlowTracer(
flowsEvictor: sync.NewCond(&sync.Mutex{}),
lastEvictionNs: uint64(monotime.Now()),
cacheMaxSize: cacheMaxSize,
enableIngress: ingress,
enableEgress: egress,
}, nil
}
......@@ -129,6 +134,23 @@ func (m *FlowTracer) Register(iface ifaces.Interface) error {
}
m.qdiscs[iface] = qdisc
if err := m.registerEgress(iface, ipvlan); err != nil {
return err
}
if err := m.registerIngress(iface, ipvlan); err != nil {
return err
}
return nil
}
func (m *FlowTracer) registerEgress(iface ifaces.Interface, ipvlan netlink.Link) error {
ilog := log.WithField("iface", iface)
if !m.enableEgress {
ilog.Debug("ignoring egress traffic, according to user configuration")
return nil
}
// Fetch events on egress
egressAttrs := netlink.FilterAttrs{
LinkIndex: ipvlan.Attrs().Index,
......@@ -146,7 +168,7 @@ func (m *FlowTracer) Register(iface ifaces.Interface) error {
if err := netlink.FilterDel(egressFilter); err == nil {
ilog.Warn("egress filter already existed. Deleted it")
}
if err = netlink.FilterAdd(egressFilter); err != nil {
if err := netlink.FilterAdd(egressFilter); err != nil {
if errors.Is(err, fs.ErrExist) {
ilog.WithError(err).Warn("egress filter already exists. Ignoring")
} else {
......@@ -154,7 +176,15 @@ func (m *FlowTracer) Register(iface ifaces.Interface) error {
}
}
m.egressFilters[iface] = egressFilter
return nil
}
func (m *FlowTracer) registerIngress(iface ifaces.Interface, ipvlan netlink.Link) error {
ilog := log.WithField("iface", iface)
if !m.enableIngress {
ilog.Debug("ignoring ingress traffic, according to user configuration")
return nil
}
// Fetch events on ingress
ingressAttrs := netlink.FilterAttrs{
LinkIndex: ipvlan.Attrs().Index,
......@@ -172,7 +202,7 @@ func (m *FlowTracer) Register(iface ifaces.Interface) error {
if err := netlink.FilterDel(ingressFilter); err == nil {
ilog.Warn("ingress filter already existed. Deleted it")
}
if err = netlink.FilterAdd(ingressFilter); err != nil {
if err := netlink.FilterAdd(ingressFilter); err != nil {
if errors.Is(err, fs.ErrExist) {
ilog.WithError(err).Warn("ingress filter already exists. Ignoring")
} else {
......@@ -376,9 +406,9 @@ func (m *FlowTracer) listenAndForwardRingBuffer(ctx context.Context, forwardFlow
// For synchronization purposes, we get/delete a whole snapshot of the flows map.
// This way we avoid missing packets that could be updated on the
// ebpf side while we process/aggregate them here
// Changing this method invaction by BatchLookupAndDelete could improve performance
// Changing this method invocation by BatchLookupAndDelete could improve performance
// TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively
// Supported Lookup/Delete oprations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md
// Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md
func (m *FlowTracer) lookupAndDeleteFlowsMap() map[flow.RecordKey][]flow.RecordMetrics {
flowMap := m.objects.AggregatedFlows
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment