diff --git a/go.mod b/go.mod index be3f8018480f3cea833a1b40ac1b4416e0769d5a..ffa0cc953aa8cbdb8f344b9c03856913ab3d1bc3 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/cilium/ebpf v0.8.1 github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424 github.com/mariomac/guara v0.0.0-20220523124851-5fc279816f1f - github.com/netobserv/gopipes v0.1.1 + github.com/netobserv/gopipes v0.2.0 github.com/paulbellamy/ratecounter v0.2.0 github.com/segmentio/kafka-go v0.4.32 github.com/sirupsen/logrus v1.8.1 diff --git a/go.sum b/go.sum index 145126043d6499a9418908df55edf2f17983bfda..80ee7a5d679ae235f7fecf7536f7e8217694169a 100644 --- a/go.sum +++ b/go.sum @@ -372,8 +372,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= -github.com/netobserv/gopipes v0.1.1 h1:f8zJsvnMgRFRa2B+1siwRtW0Y4dqeBROmkcI/HgT1gE= -github.com/netobserv/gopipes v0.1.1/go.mod h1:eGoHZW1ON8Dx/zmDXUhsbVNqatPjtpdO0UZBmGZGmVI= +github.com/netobserv/gopipes v0.2.0 h1:CnJQq32+xNuM85eVYy/HOf+StTJdh2K6RdaEg7NAJDg= +github.com/netobserv/gopipes v0.2.0/go.mod h1:eGoHZW1ON8Dx/zmDXUhsbVNqatPjtpdO0UZBmGZGmVI= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index c21d3b8e827e2cea9b877b19d4e5b2d1ae27cab0..bfd8270c6dd7d84d4180e088bb424540676af377 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -168,11 +168,10 @@ func flowDirections(cfg *Config) (ingress, egress bool) { // until the passed context is canceled func (f *Flows) Run(ctx context.Context) error { alog.Info("starting Flows agent") - tracedRecords, err := f.interfacesManager(ctx) + graph, err := f.processPipeline(ctx) if err != nil { - return err + return fmt.Errorf("starting processing graph: %w", err) } - graph := f.processRecords(tracedRecords) alog.Info("Flows agent successfully started") <-ctx.Done() @@ -187,7 +186,7 @@ func (f *Flows) Run(ctx context.Context) error { // interfacesManager uses an informer to check new/deleted network interfaces. For each running // interface, it registers a flow tracer that will forward new flows to the returned channel -func (f *Flows) interfacesManager(ctx context.Context) (<-chan []*flow.Record, error) { +func (f *Flows) interfacesManager(ctx context.Context) (node.InitFunc, error) { slog := alog.WithField("function", "interfacesManager") slog.Debug("subscribing for network interface events") @@ -196,11 +195,7 @@ func (f *Flows) interfacesManager(ctx context.Context) (<-chan []*flow.Record, e return nil, fmt.Errorf("instantiating interfaces' informer: %w", err) } - tracedRecords := make(chan []*flow.Record, f.cfg.BuffersLength) - tctx, cancelTracer := context.WithCancel(ctx) - go f.tracer.Trace(tctx, tracedRecords) - go func() { for { select { @@ -208,7 +203,6 @@ func (f *Flows) interfacesManager(ctx context.Context) (<-chan []*flow.Record, e slog.Debug("canceling flow tracer") cancelTracer() slog.Debug("closing channel and exiting internal goroutine") - close(tracedRecords) return case event := <-ifaceEvents: slog.WithField("event", event).Debug("received event") @@ -225,25 +219,28 @@ func (f *Flows) interfacesManager(ctx context.Context) (<-chan []*flow.Record, e } }() - return tracedRecords, nil + return func(out chan<- []*flow.Record) { + f.tracer.Trace(tctx, out) + }, nil } -// processRecords creates the tracers --> accounter --> forwarder Flow processing graph -func (f *Flows) processRecords(tracedRecords <-chan []*flow.Record) *node.Terminal { - // The start node receives Records from the eBPF flow tracers. Currently it is just an external - // channel forwarder, as the Pipes library does not yet accept - // adding/removing nodes dynamically: https://github.com/mariomac/pipes/issues/5 +// processPipeline creates the tracers --> accounter --> forwarder Flow processing graph +func (f *Flows) processPipeline(ctx context.Context) (*node.Terminal, error) { + alog.Debug("registering tracers' input") - tracersCollector := node.AsInit(func(out chan<- []*flow.Record) { - for i := range tracedRecords { - out <- i - } - }) + tracedRecords, err := f.interfacesManager(ctx) + if err != nil { + return nil, err + } + tracersCollector := node.AsInit(tracedRecords) + alog.Debug("registering exporter") - export := node.AsTerminal(f.exporter) + export := node.AsTerminal(f.exporter, + node.ChannelBufferLen(f.cfg.BuffersLength)) alog.Debug("connecting graph") if f.cfg.Deduper == DeduperFirstCome { - deduper := node.AsMiddle(flow.Dedupe(f.cfg.DeduperFCExpiry)) + deduper := node.AsMiddle(flow.Dedupe(f.cfg.DeduperFCExpiry), + node.ChannelBufferLen(f.cfg.BuffersLength)) tracersCollector.SendsTo(deduper) deduper.SendsTo(export) } else { @@ -251,7 +248,7 @@ func (f *Flows) processRecords(tracedRecords <-chan []*flow.Record) *node.Termin } alog.Debug("starting graph") tracersCollector.Start() - return export + return export, nil } func (f *Flows) onInterfaceAdded(iface ifaces.Interface) { diff --git a/pkg/ebpf/tracer.go b/pkg/ebpf/tracer.go index 95d381d4c83ecce7e9bb8ecac984be4fa2e67ca9..98163b2b610f0886b12b053e5dbd3ed93b9a43ee 100644 --- a/pkg/ebpf/tracer.go +++ b/pkg/ebpf/tracer.go @@ -8,6 +8,7 @@ import ( "io/fs" "strings" "sync" + "sync/atomic" "time" "github.com/cilium/ebpf/ringbuf" @@ -296,10 +297,13 @@ func (m *FlowTracer) pollAndForwardAggregateFlows(ctx context.Context, forwardFl <-ctx.Done() }() go func() { + // flow eviction loop. It just keeps waiting for eviction until someone triggers the + // flowsEvictor.Broadcast signal for { // make sure we only evict once at a time, even if there are multiple eviction signals m.flowsEvictor.L.Lock() m.flowsEvictor.Wait() + tlog.Debug("eviction signal received") m.evictFlows(tlog, forwardFlows) m.flowsEvictor.L.Unlock() @@ -315,13 +319,13 @@ func (m *FlowTracer) pollAndForwardAggregateFlows(ctx context.Context, forwardFl for { select { case <-ctx.Done(): - tlog.Debug("evicting flows after context cancelation") + tlog.Debug("triggering flow eviction after context cancelation") m.flowsEvictor.Broadcast() ticker.Stop() tlog.Debug("exiting monitor") return case <-ticker.C: - tlog.Debug("evicting flows on timer") + tlog.Debug("triggering flow eviction on timer") m.flowsEvictor.Broadcast() } } @@ -372,6 +376,8 @@ func (m *FlowTracer) Trace(ctx context.Context, forwardFlows chan<- []*flow.Reco func (m *FlowTracer) listenAndForwardRingBuffer(ctx context.Context, forwardFlows chan<- []*flow.Record) { flowAccount := make(chan *flow.RawRecord, m.buffersLength) go m.accounter.Account(flowAccount, forwardFlows) + isForwarding := int32(0) + forwardedFlows := int32(0) for { select { case <-ctx.Done(): @@ -393,8 +399,10 @@ func (m *FlowTracer) listenAndForwardRingBuffer(ctx context.Context, forwardFlow log.WithError(err).Warn("reading ringbuf event") continue } - log.WithField("direction", readFlow.Direction). - Debug("received flow from ringbuffer. Evicting in-memory maps to leave free space") + if logrus.IsLevelEnabled(logrus.DebugLevel) { + m.logRingBufferFlows(&forwardedFlows, &isForwarding) + } + // forces a flow's eviction to leave room for new flows in the ebpf cache m.flowsEvictor.Broadcast() // Will need to send it to accounter anyway to account regardless of complete/ongoing flow @@ -403,12 +411,30 @@ func (m *FlowTracer) listenAndForwardRingBuffer(ctx context.Context, forwardFlow } } +// logRingBufferFlows avoids flooding logs on long series of evicted flows by grouping how +// many flows are forwarded +func (m *FlowTracer) logRingBufferFlows(forwardedFlows, isForwarding *int32) { + atomic.AddInt32(forwardedFlows, 1) + if atomic.CompareAndSwapInt32(isForwarding, 0, 1) { + go func() { + time.Sleep(m.evictionTimeout) + log.WithFields(logrus.Fields{ + "flows": atomic.LoadInt32(forwardedFlows), + "cacheMaxFlows": m.cacheMaxSize, + }).Debug("received flows via ringbuffer. You might want to increase the CACHE_MAX_FLOWS value") + atomic.StoreInt32(forwardedFlows, 0) + atomic.StoreInt32(isForwarding, 0) + }() + } +} + // For synchronization purposes, we get/delete a whole snapshot of the flows map. // This way we avoid missing packets that could be updated on the // ebpf side while we process/aggregate them here // Changing this method invocation by BatchLookupAndDelete could improve performance // TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively // Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md +// Race conditions here causes that some flows are lost in high-load scenarios func (m *FlowTracer) lookupAndDeleteFlowsMap() map[flow.RecordKey][]flow.RecordMetrics { flowMap := m.objects.AggregatedFlows diff --git a/pkg/flow/account.go b/pkg/flow/account.go index 92b59ef768aa5c5389df6a9e885b256cd07a7948..058fddf7bef13e4ba93269df15117daca90221db 100644 --- a/pkg/flow/account.go +++ b/pkg/flow/account.go @@ -52,6 +52,8 @@ func (c *Accounter) Account(in <-chan *RawRecord, out chan<- []*Record) { } evictingEntries := c.entries c.entries = make(map[RecordKey]*RecordMetrics, c.maxEntries) + logrus.WithField("flows", len(evictingEntries)). + Debug("evicting flows from userspace accounter on timeout") go c.evict(evictingEntries, out) case record, ok := <-in: if !ok { @@ -69,12 +71,13 @@ func (c *Accounter) Account(in <-chan *RawRecord, out chan<- []*Record) { if len(c.entries) >= c.maxEntries { evictingEntries := c.entries c.entries = make(map[RecordKey]*RecordMetrics, c.maxEntries) + logrus.WithField("flows", len(evictingEntries)). + Debug("evicting flows from userspace accounter after reaching cache max length") go c.evict(evictingEntries, out) } c.entries[record.RecordKey] = &record.RecordMetrics } } - } } diff --git a/vendor/github.com/netobserv/gopipes/pkg/node/node.go b/vendor/github.com/netobserv/gopipes/pkg/node/node.go index 931def963b9fe0da9850a631027de7f050e4ac65..1c78909cca7e0f49f338736ec28246941784c306 100644 --- a/vendor/github.com/netobserv/gopipes/pkg/node/node.go +++ b/vendor/github.com/netobserv/gopipes/pkg/node/node.go @@ -11,9 +11,6 @@ import ( "github.com/netobserv/gopipes/pkg/internal/refl" ) -// todo: make it configurable -const chBufLen = 20 - // InitFunc is a function that receives a writable channel as unique argument, and sends // value to that channel during an indefinite amount of time. // TODO: with Go 1.18, this will be @@ -151,9 +148,10 @@ func AsInit(fun InitFunc) *Init { } } -// AsMiddle wraps an MiddleFunc into an Middle node. +// AsMiddle wraps an MiddleFunc into an Middle node, allowing to configure some instantiation +// parameters by means of an optional list of node.CreationOption. // It panics if the MiddleFunc does not follow the func(<-chan,chan<-) signature. -func AsMiddle(fun MiddleFunc) *Middle { +func AsMiddle(fun MiddleFunc, opts ...CreationOption) *Middle { fn := refl.WrapFunction(fun) // check that the arguments are a read channel and a write channel fn.AssertNumberOfArguments(2) @@ -165,17 +163,19 @@ func AsMiddle(fun MiddleFunc) *Middle { if !outCh.CanSend() { panic(fn.String() + " second argument should be a writable channel") } + options := getOptions(opts...) return &Middle{ - inputs: connect.NewJoiner(inCh, chBufLen), + inputs: connect.NewJoiner(inCh, options.channelBufferLen), fun: fn, inType: inCh.ElemType(), outType: outCh.ElemType(), } } -// AsTerminal wraps a TerminalFunc into a Terminal node. +// AsTerminal wraps a TerminalFunc into a Terminal node, allowing to configure some instantiation +// parameters by means of an optional list of node.CreationOption. // It panics if the TerminalFunc does not follow the func(<-chan) signature. -func AsTerminal(fun TerminalFunc) *Terminal { +func AsTerminal(fun TerminalFunc, opts ...CreationOption) *Terminal { fn := refl.WrapFunction(fun) // check that the arguments are only a read channel fn.AssertNumberOfArguments(1) @@ -183,8 +183,9 @@ func AsTerminal(fun TerminalFunc) *Terminal { if !inCh.CanReceive() { panic(fn.String() + " first argument should be a readable channel") } + options := getOptions(opts...) return &Terminal{ - inputs: connect.NewJoiner(inCh, chBufLen), + inputs: connect.NewJoiner(inCh, options.channelBufferLen), fun: fn, done: make(chan struct{}), inType: inCh.ElemType(), @@ -244,3 +245,11 @@ func assertChannelsCompatibility(srcInputType refl.ChannelType, outputs []Receiv } } } + +func getOptions(opts ...CreationOption) creationOptions { + options := defaultOptions + for _, opt := range opts { + opt(&options) + } + return options +} diff --git a/vendor/github.com/netobserv/gopipes/pkg/node/options.go b/vendor/github.com/netobserv/gopipes/pkg/node/options.go new file mode 100644 index 0000000000000000000000000000000000000000..49c47935bc198921882c13d53e2acb38f6cbf709 --- /dev/null +++ b/vendor/github.com/netobserv/gopipes/pkg/node/options.go @@ -0,0 +1,22 @@ +package node + +type creationOptions struct { + // if 0, channel is unbuffered + channelBufferLen int +} + +var defaultOptions = creationOptions{ + channelBufferLen: 0, +} + +// CreationOption allows overriding the default values of node instantiation +type CreationOption func(options *creationOptions) + +// ChannelBufferLen is a node.CreationOption that allows specifying the length of the input +// channels for a given node. The default value is 0, which means that the channels +// are unbuffered. +func ChannelBufferLen(length int) CreationOption { + return func(options *creationOptions) { + options.channelBufferLen = length + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index fa8bed550a0ecbf837789c0865a1d8678c513d7e..7be3813995f3cb240615856027f7b074d3cfcf3d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -106,7 +106,7 @@ github.com/modern-go/reflect2 # github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 ## explicit github.com/munnerz/goautoneg -# github.com/netobserv/gopipes v0.1.1 +# github.com/netobserv/gopipes v0.2.0 ## explicit; go 1.17 github.com/netobserv/gopipes/pkg/internal/connect github.com/netobserv/gopipes/pkg/internal/refl