diff --git a/docs/architecture.md b/docs/architecture.md index 860c63c7e9782ff52068595b192897b8eebaf872..201d913f9d97f3eeb9a268774b0963bf4f1883d9 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -12,14 +12,15 @@ flowchart TD E(ebpf.FlowFetcher) --> |"pushes via<br/>RingBuffer"| RB(flow.RingBufTracer) E <--> |"polls<br/>PerCPUHashMap"| M(flow.MapTracer) - RB --> |*flow.Record| ACC(flow.Accounter) - - ACC --> |"[]*flow.Record"| DD(flow.Deduper) - M --> |"[]*flow.Record"| DD + RB --> |chan *flow.Record| ACC(flow.Accounter) + ACC --> |"chan []*flow.Record"| DD(flow.Deduper) + M --> |"chan []*flow.Record"| DD subgraph Optional DD end - DD --> |"[]*flow.Record"| EX("export.GRPCProto<br/>or<br/>export.KafkaProto") + DD --> |"chan []*flow.Record"| CL(flow.CapacityLimiter) + + CL --> |"chan []*flow.Record"| EX("export.GRPCProto<br/>or<br/>export.KafkaProto") ``` \ No newline at end of file diff --git a/docs/config.md b/docs/config.md index ec2c334e8addc99e7b00f857b82aeef4233b6950..aa4bc815ddbab9a4119fd3a4151347dce79b4e9f 100644 --- a/docs/config.md +++ b/docs/config.md @@ -59,6 +59,10 @@ so no user should need to change them. * `BUFFERS_LENGTH` (default: `50`). Length of the internal communication channels between the different processing stages. +* `EXPORTER_BUFFER_LENGTH` (default: value of `BUFFERS_LENGTH`) establishes the length of the buffer + of flow batches (not individual flows) that can be accumulated before the Kafka or GRPC exporter. + When this buffer is full (e.g. because the Kafka or GRPC endpoint is slow), incoming flow batches + will be dropped. If unset, its value is the same as the BUFFERS_LENGTH property. * `KAFKA_ASYNC` (default: `true`). If `true`, the message writing process will never block. It also means that errors are ignored since the caller will not receive the returned value. * `LISTEN_INTERFACES` (default: `watch`). Mechanism used by the agent to listen for added or removed diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 263e62188b792bff87c381feadc3ae3977f18180..635e0ca86f06ef487799e99069fb258c03e27cf0 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -265,9 +265,17 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal, erro accounter := node.AsMiddle(f.accounter.Account, node.ChannelBufferLen(f.cfg.BuffersLength)) - export := node.AsTerminal(f.exporter, + limiter := node.AsMiddle((&flow.CapacityLimiter{}).Limit, node.ChannelBufferLen(f.cfg.BuffersLength)) + ebl := f.cfg.ExporterBufferLength + if ebl == 0 { + ebl = f.cfg.BuffersLength + } + + export := node.AsTerminal(f.exporter, + node.ChannelBufferLen(ebl)) + rbTracer.SendsTo(accounter) if f.cfg.Deduper == DeduperFirstCome { @@ -275,11 +283,12 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal, erro node.ChannelBufferLen(f.cfg.BuffersLength)) mapTracer.SendsTo(deduper) accounter.SendsTo(deduper) - deduper.SendsTo(export) + deduper.SendsTo(limiter) } else { - mapTracer.SendsTo(export) - accounter.SendsTo(export) + mapTracer.SendsTo(limiter) + accounter.SendsTo(limiter) } + limiter.SendsTo(export) alog.Debug("starting graph") mapTracer.Start() rbTracer.Start() diff --git a/pkg/agent/config.go b/pkg/agent/config.go index 3e6d1bc9eebeac631050e34803f1a2c726caa5d4..03b67c9a43b5b1dbaf60e5e528ef7f363afa3988 100644 --- a/pkg/agent/config.go +++ b/pkg/agent/config.go @@ -35,6 +35,11 @@ type Config struct { // BuffersLength establishes the length of communication channels between the different processing // stages BuffersLength int `env:"BUFFERS_LENGTH" envDefault:"50"` + // ExporterBufferLength establishes the length of the buffer of flow batches (not individual flows) + // that can be accumulated before the Kafka or GRPC exporter. When this buffer is full (e.g. + // because the Kafka or GRPC endpoint is slow), incoming flow batches will be dropped. If unset, + // its value is the same as the BUFFERS_LENGTH property. + ExporterBufferLength int `env:"EXPORTER_BUFFER_LENGTH"` // CacheMaxFlows specifies how many flows can be accumulated in the accounting cache before // being flushed for its later export CacheMaxFlows int `env:"CACHE_MAX_FLOWS" envDefault:"5000"` diff --git a/pkg/flow/limiter.go b/pkg/flow/limiter.go new file mode 100644 index 0000000000000000000000000000000000000000..5700a78b97d7e966576e2f9b07d105b29705f02b --- /dev/null +++ b/pkg/flow/limiter.go @@ -0,0 +1,59 @@ +package flow + +import ( + "time" + + "github.com/sirupsen/logrus" +) + +const initialLogPeriod = time.Minute +const maxLogPeriod = time.Hour + +var cllog = logrus.WithField("component", "capacity.Limiter") + +// CapacityLimiter forwards the flows between two nodes but checks the status of the destination +// node's buffered channel. If it is already full, it drops the incoming flow and periodically will +// log a message about the number of lost flows. +type CapacityLimiter struct { + droppedFlows int +} + +func (c *CapacityLimiter) Limit(in <-chan []*Record, out chan<- []*Record) { + go c.logDroppedFlows() + for i := range in { + if len(out) < cap(out) { + out <- i + } else { + c.droppedFlows += len(i) + } + } +} + +func (c *CapacityLimiter) logDroppedFlows() { + logPeriod := initialLogPeriod + debugging := logrus.IsLevelEnabled(logrus.DebugLevel) + for { + time.Sleep(logPeriod) + + // a race condition might happen in this counter but it's not important as it's just for + // logging purposes + df := c.droppedFlows + if df > 0 { + c.droppedFlows = 0 + cllog.Warnf("%d flows were dropped during the last %s because the agent is forwarding "+ + "more flows than the remote ingestor is able to process. You might "+ + "want to increase the CACHE_MAX_FLOWS and CACHE_ACTIVE_TIMEOUT property", + df, logPeriod) + + // if not debug logs, backoff to avoid flooding the log with warning messages + if !debugging && logPeriod < maxLogPeriod { + logPeriod *= 2 + if logPeriod > maxLogPeriod { + logPeriod = maxLogPeriod + } + } + } else { + logPeriod = initialLogPeriod + } + } +} diff --git a/pkg/flow/limiter_test.go b/pkg/flow/limiter_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a2cb510d1bcb75c4ccbeb8659e9140915321be85 --- /dev/null +++ b/pkg/flow/limiter_test.go @@ -0,0 +1,87 @@ +package flow + +import ( + "strconv" + "testing" + + "github.com/netobserv/gopipes/pkg/node" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const limiterLen = 50 + +func TestCapacityLimiter_NoDrop(t *testing.T) { + // GIVEN a limiter-enabled pipeline + pipeIn, pipeOut := capacityLimiterPipe() + + // WHEN it buffers less elements than it's maximum capacity + for i := 0; i < 33; i++ { + pipeIn <- []*Record{{Interface: strconv.Itoa(i)}} + } + + // THEN it is able to retrieve all the buffered elements + for i := 0; i < 33; i++ { + elem := <-pipeOut + require.Len(t, elem, 1) + assert.Equal(t, strconv.Itoa(i), elem[0].Interface) + } + + // AND not a single extra element + select { + case elem := <-pipeOut: + assert.Failf(t, "unexpected element", "%#v", elem) + default: + // ok! + } +} + +func TestCapacityLimiter_Drop(t *testing.T) { + // GIVEN a limiter-enabled pipeline + pipeIn, pipeOut := capacityLimiterPipe() + + // WHEN it receives more elements than its maximum capacity + // (it's not blocking) + for i := 0; i < limiterLen*2; i++ { + pipeIn <- []*Record{{Interface: strconv.Itoa(i)}} + } + + // THEN it is only able to retrieve all the nth first buffered elements + // (plus the single element that is buffered in the output channel) + for i := 0; i < limiterLen+1; i++ { + elem := <-pipeOut + require.Len(t, elem, 1) + assert.Equal(t, strconv.Itoa(i), elem[0].Interface) + } + + // BUT not a single extra element + select { + case elem := <-pipeOut: + assert.Failf(t, "unexpected element", "%#v", elem) + default: + // ok! + } +} + +func capacityLimiterPipe() (in chan<- []*Record, out <-chan []*Record) { + inCh, outCh := make(chan []*Record), make(chan []*Record) + + init := node.AsInit(func(initOut chan<- []*Record) { + for i := range inCh { + initOut <- i + } + }) + limiter := node.AsMiddle((&CapacityLimiter{}).Limit) + term := node.AsTerminal(func(termIn <-chan []*Record) { + for i := range termIn { + outCh <- i + } + }, node.ChannelBufferLen(limiterLen)) + + init.SendsTo(limiter) + limiter.SendsTo(term) + + init.Start() + + return inCh, outCh +}