Skip to content
Snippets Groups Projects
Select Git revision
  • 036919cec5e9431903b0f75a4b1964cdf08b05e8
  • cicd default
  • main protected
  • konflux/mintmaker/main/go-modules
  • konflux/mintmaker/main/major-go-modules
  • konflux/mintmaker/main/github.com-netsampler-goflow2-2.x
  • konflux/mintmaker/main/github.com-golang-snappy-1.x
  • konflux/mintmaker/main/github.com-cenkalti-rpc2-1.x
  • konflux/mintmaker/main/github.com-cenkalti-backoff-v4-5.x
  • konflux/mintmaker/main/golang.org-x-sys-0.x
  • konflux/mintmaker/main/github.com-vladimirvivien-gexe-0.x
  • konflux/mintmaker/main/github.com-fsnotify-fsnotify-1.x
  • konflux/mintmaker/main/opentelemetry-go-monorepo
  • konflux/mintmaker/main/sigs.k8s.io-structured-merge-diff-v4-4.x
  • konflux/mintmaker/main/lukechampine.com-uint128-1.x
  • konflux/mintmaker/main/golang.org-x-time-0.x
  • konflux/mintmaker/main/golang.org-x-text-0.x
  • konflux/mintmaker/main/golang.org-x-term-0.x
  • konflux/mintmaker/main/golang.org-x-oauth2-0.x
  • konflux/mintmaker/main/golang.org-x-net-0.x
  • konflux/mintmaker/main/golang.org-x-crypto-0.x
  • v1.9.1-community
  • v1.9.0-community
  • v1.9.0-crc0
  • v1.8.2-community
  • v1.8.1-community
  • v1.8.1-crc0
  • v1.8.0-community
  • v1.8.0-crc0
  • v1.7.0-community
  • v1.6.2-community
  • v1.6.1-community
  • v1.6.1-crc2
  • v1.6.1-crc1
  • v1.6.1-crc0
  • v1.6.0-community
  • v1.6.0-crc0
  • v0.3.3
  • v0.3.3-rc0
  • v0.3.2
  • v0.3.2-rc0
41 results

agent.go

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    agent.go 8.94 KiB
    package agent
    
    import (
    	"context"
    	"errors"
    	"fmt"
    	"sync"
    	"time"
    
    	"github.com/netobserv/gopipes/pkg/node"
    	"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
    	"github.com/netobserv/netobserv-ebpf-agent/pkg/exporter"
    	"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
    	"github.com/netobserv/netobserv-ebpf-agent/pkg/ifaces"
    	kafkago "github.com/segmentio/kafka-go"
    	"github.com/segmentio/kafka-go/compress"
    	"github.com/sirupsen/logrus"
    )
    
    var alog = logrus.WithField("component", "agent.Flows")
    
    // Flows reporting agent
    type Flows struct {
    	// trMutex provides synchronized access to the tracers map
    	trMutex sync.Mutex
    	// tracers stores a flowTracer implementation for each interface in the system, with a
    	// cancel function that allows stopping it when its interface is deleted
    	tracers    map[ifaces.Name]cancellableTracer
    	accounter  *flow.Accounter
    	exporter   flowExporter
    	interfaces ifaces.Informer
    	filter     interfaceFilter
    	// tracerFactory specifies how to instantiate flowTracer implementations
    	tracerFactory func(name string, sampling uint32) flowTracer
    	cfg           *Config
    }
    
    // flowTracer abstracts the interface of ebpf.FlowTracer to allow dependency injection in tests
    type flowTracer interface {
    	Trace(ctx context.Context, forwardFlows chan<- *flow.Record)
    	Register() error
    	Unregister() error
    }
    
    type cancellableTracer struct {
    	tracer flowTracer
    	cancel context.CancelFunc
    }
    
    // flowExporter abstract the ExportFlows' method of exporter.GRPCProto to allow dependency injection
    // in tests
    type flowExporter func(in <-chan []*flow.Record)
    
    // FlowsAgent instantiates a new agent, given a configuration.
    func FlowsAgent(cfg *Config) (*Flows, error) {
    	alog.Info("initializing Flows agent")
    
    	// configure allow/deny interfaces filter
    	filter, err := initInterfaceFilter(cfg.Interfaces, cfg.ExcludeInterfaces)
    	if err != nil {
    		return nil, fmt.Errorf("configuring interface filters: %w", err)
    	}
    
    	// configure informer for new interfaces
    	var informer ifaces.Informer
    	switch cfg.ListenInterfaces {
    	case ListenPoll:
    		alog.WithField("period", cfg.ListenPollPeriod).
    			Debug("listening for new interfaces: use polling")
    		informer = ifaces.NewPoller(cfg.ListenPollPeriod, cfg.BuffersLength)