Select Git revision
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
agent.go 8.94 KiB
package agent
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/netobserv/gopipes/pkg/node"
"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
"github.com/netobserv/netobserv-ebpf-agent/pkg/exporter"
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/ifaces"
kafkago "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/compress"
"github.com/sirupsen/logrus"
)
var alog = logrus.WithField("component", "agent.Flows")
// Flows reporting agent
type Flows struct {
// trMutex provides synchronized access to the tracers map
trMutex sync.Mutex
// tracers stores a flowTracer implementation for each interface in the system, with a
// cancel function that allows stopping it when its interface is deleted
tracers map[ifaces.Name]cancellableTracer
accounter *flow.Accounter
exporter flowExporter
interfaces ifaces.Informer
filter interfaceFilter
// tracerFactory specifies how to instantiate flowTracer implementations
tracerFactory func(name string, sampling uint32) flowTracer
cfg *Config
}
// flowTracer abstracts the interface of ebpf.FlowTracer to allow dependency injection in tests
type flowTracer interface {
Trace(ctx context.Context, forwardFlows chan<- *flow.Record)
Register() error
Unregister() error
}
type cancellableTracer struct {
tracer flowTracer
cancel context.CancelFunc
}
// flowExporter abstract the ExportFlows' method of exporter.GRPCProto to allow dependency injection
// in tests
type flowExporter func(in <-chan []*flow.Record)
// FlowsAgent instantiates a new agent, given a configuration.
func FlowsAgent(cfg *Config) (*Flows, error) {
alog.Info("initializing Flows agent")
// configure allow/deny interfaces filter
filter, err := initInterfaceFilter(cfg.Interfaces, cfg.ExcludeInterfaces)
if err != nil {
return nil, fmt.Errorf("configuring interface filters: %w", err)
}
// configure informer for new interfaces
var informer ifaces.Informer
switch cfg.ListenInterfaces {
case ListenPoll:
alog.WithField("period", cfg.ListenPollPeriod).
Debug("listening for new interfaces: use polling")
informer = ifaces.NewPoller(cfg.ListenPollPeriod, cfg.BuffersLength)