Skip to content
Snippets Groups Projects
Unverified Commit 689b24db authored by Joel Takvorian's avatar Joel Takvorian Committed by GitHub
Browse files

Merge pull request #63 from mariomac/capacity-limiter

NETOBSERV-613: drop messages when they accumulate in the exporter
parents f63d1045 1f065754
No related branches found
No related tags found
No related merge requests found
...@@ -12,14 +12,15 @@ flowchart TD ...@@ -12,14 +12,15 @@ flowchart TD
E(ebpf.FlowFetcher) --> |"pushes via<br/>RingBuffer"| RB(flow.RingBufTracer) E(ebpf.FlowFetcher) --> |"pushes via<br/>RingBuffer"| RB(flow.RingBufTracer)
E <--> |"polls<br/>PerCPUHashMap"| M(flow.MapTracer) E <--> |"polls<br/>PerCPUHashMap"| M(flow.MapTracer)
RB --> |*flow.Record| ACC(flow.Accounter) RB --> |chan *flow.Record| ACC(flow.Accounter)
ACC --> |"chan []*flow.Record"| DD(flow.Deduper)
ACC --> |"[]*flow.Record"| DD(flow.Deduper) M --> |"chan []*flow.Record"| DD
M --> |"[]*flow.Record"| DD
subgraph Optional subgraph Optional
DD DD
end end
DD --> |"[]*flow.Record"| EX("export.GRPCProto<br/>or<br/>export.KafkaProto") DD --> |"chan []*flow.Record"| CL(flow.CapacityLimiter)
CL --> |"chan []*flow.Record"| EX("export.GRPCProto<br/>or<br/>export.KafkaProto")
``` ```
\ No newline at end of file
...@@ -59,6 +59,10 @@ so no user should need to change them. ...@@ -59,6 +59,10 @@ so no user should need to change them.
* `BUFFERS_LENGTH` (default: `50`). Length of the internal communication channels between the different * `BUFFERS_LENGTH` (default: `50`). Length of the internal communication channels between the different
processing stages. processing stages.
* `EXPORTER_BUFFER_LENGTH` (default: value of `BUFFERS_LENGTH`) establishes the length of the buffer
of flow batches (not individual flows) that can be accumulated before the Kafka or GRPC exporter.
When this buffer is full (e.g. because the Kafka or GRPC endpoint is slow), incoming flow batches
will be dropped. If unset, its value is the same as the BUFFERS_LENGTH property.
* `KAFKA_ASYNC` (default: `true`). If `true`, the message writing process will never block. It also * `KAFKA_ASYNC` (default: `true`). If `true`, the message writing process will never block. It also
means that errors are ignored since the caller will not receive the returned value. means that errors are ignored since the caller will not receive the returned value.
* `LISTEN_INTERFACES` (default: `watch`). Mechanism used by the agent to listen for added or removed * `LISTEN_INTERFACES` (default: `watch`). Mechanism used by the agent to listen for added or removed
......
...@@ -265,9 +265,17 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal, erro ...@@ -265,9 +265,17 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal, erro
accounter := node.AsMiddle(f.accounter.Account, accounter := node.AsMiddle(f.accounter.Account,
node.ChannelBufferLen(f.cfg.BuffersLength)) node.ChannelBufferLen(f.cfg.BuffersLength))
export := node.AsTerminal(f.exporter, limiter := node.AsMiddle((&flow.CapacityLimiter{}).Limit,
node.ChannelBufferLen(f.cfg.BuffersLength)) node.ChannelBufferLen(f.cfg.BuffersLength))
ebl := f.cfg.ExporterBufferLength
if ebl == 0 {
ebl = f.cfg.BuffersLength
}
export := node.AsTerminal(f.exporter,
node.ChannelBufferLen(ebl))
rbTracer.SendsTo(accounter) rbTracer.SendsTo(accounter)
if f.cfg.Deduper == DeduperFirstCome { if f.cfg.Deduper == DeduperFirstCome {
...@@ -275,11 +283,12 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal, erro ...@@ -275,11 +283,12 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal, erro
node.ChannelBufferLen(f.cfg.BuffersLength)) node.ChannelBufferLen(f.cfg.BuffersLength))
mapTracer.SendsTo(deduper) mapTracer.SendsTo(deduper)
accounter.SendsTo(deduper) accounter.SendsTo(deduper)
deduper.SendsTo(export) deduper.SendsTo(limiter)
} else { } else {
mapTracer.SendsTo(export) mapTracer.SendsTo(limiter)
accounter.SendsTo(export) accounter.SendsTo(limiter)
} }
limiter.SendsTo(export)
alog.Debug("starting graph") alog.Debug("starting graph")
mapTracer.Start() mapTracer.Start()
rbTracer.Start() rbTracer.Start()
......
...@@ -35,6 +35,11 @@ type Config struct { ...@@ -35,6 +35,11 @@ type Config struct {
// BuffersLength establishes the length of communication channels between the different processing // BuffersLength establishes the length of communication channels between the different processing
// stages // stages
BuffersLength int `env:"BUFFERS_LENGTH" envDefault:"50"` BuffersLength int `env:"BUFFERS_LENGTH" envDefault:"50"`
// ExporterBufferLength establishes the length of the buffer of flow batches (not individual flows)
// that can be accumulated before the Kafka or GRPC exporter. When this buffer is full (e.g.
// because the Kafka or GRPC endpoint is slow), incoming flow batches will be dropped. If unset,
// its value is the same as the BUFFERS_LENGTH property.
ExporterBufferLength int `env:"EXPORTER_BUFFER_LENGTH"`
// CacheMaxFlows specifies how many flows can be accumulated in the accounting cache before // CacheMaxFlows specifies how many flows can be accumulated in the accounting cache before
// being flushed for its later export // being flushed for its later export
CacheMaxFlows int `env:"CACHE_MAX_FLOWS" envDefault:"5000"` CacheMaxFlows int `env:"CACHE_MAX_FLOWS" envDefault:"5000"`
......
package flow
import (
"time"
"github.com/sirupsen/logrus"
)
const initialLogPeriod = time.Minute
const maxLogPeriod = time.Hour
var cllog = logrus.WithField("component", "capacity.Limiter")
// CapacityLimiter forwards the flows between two nodes but checks the status of the destination
// node's buffered channel. If it is already full, it drops the incoming flow and periodically will
// log a message about the number of lost flows.
type CapacityLimiter struct {
droppedFlows int
}
func (c *CapacityLimiter) Limit(in <-chan []*Record, out chan<- []*Record) {
go c.logDroppedFlows()
for i := range in {
if len(out) < cap(out) {
out <- i
} else {
c.droppedFlows += len(i)
}
}
}
func (c *CapacityLimiter) logDroppedFlows() {
logPeriod := initialLogPeriod
debugging := logrus.IsLevelEnabled(logrus.DebugLevel)
for {
time.Sleep(logPeriod)
// a race condition might happen in this counter but it's not important as it's just for
// logging purposes
df := c.droppedFlows
if df > 0 {
c.droppedFlows = 0
cllog.Warnf("%d flows were dropped during the last %s because the agent is forwarding "+
"more flows than the remote ingestor is able to process. You might "+
"want to increase the CACHE_MAX_FLOWS and CACHE_ACTIVE_TIMEOUT property",
df, logPeriod)
// if not debug logs, backoff to avoid flooding the log with warning messages
if !debugging && logPeriod < maxLogPeriod {
logPeriod *= 2
if logPeriod > maxLogPeriod {
logPeriod = maxLogPeriod
}
}
} else {
logPeriod = initialLogPeriod
}
}
}
package flow
import (
"strconv"
"testing"
"github.com/netobserv/gopipes/pkg/node"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const limiterLen = 50
func TestCapacityLimiter_NoDrop(t *testing.T) {
// GIVEN a limiter-enabled pipeline
pipeIn, pipeOut := capacityLimiterPipe()
// WHEN it buffers less elements than it's maximum capacity
for i := 0; i < 33; i++ {
pipeIn <- []*Record{{Interface: strconv.Itoa(i)}}
}
// THEN it is able to retrieve all the buffered elements
for i := 0; i < 33; i++ {
elem := <-pipeOut
require.Len(t, elem, 1)
assert.Equal(t, strconv.Itoa(i), elem[0].Interface)
}
// AND not a single extra element
select {
case elem := <-pipeOut:
assert.Failf(t, "unexpected element", "%#v", elem)
default:
// ok!
}
}
func TestCapacityLimiter_Drop(t *testing.T) {
// GIVEN a limiter-enabled pipeline
pipeIn, pipeOut := capacityLimiterPipe()
// WHEN it receives more elements than its maximum capacity
// (it's not blocking)
for i := 0; i < limiterLen*2; i++ {
pipeIn <- []*Record{{Interface: strconv.Itoa(i)}}
}
// THEN it is only able to retrieve all the nth first buffered elements
// (plus the single element that is buffered in the output channel)
for i := 0; i < limiterLen+1; i++ {
elem := <-pipeOut
require.Len(t, elem, 1)
assert.Equal(t, strconv.Itoa(i), elem[0].Interface)
}
// BUT not a single extra element
select {
case elem := <-pipeOut:
assert.Failf(t, "unexpected element", "%#v", elem)
default:
// ok!
}
}
func capacityLimiterPipe() (in chan<- []*Record, out <-chan []*Record) {
inCh, outCh := make(chan []*Record), make(chan []*Record)
init := node.AsInit(func(initOut chan<- []*Record) {
for i := range inCh {
initOut <- i
}
})
limiter := node.AsMiddle((&CapacityLimiter{}).Limit)
term := node.AsTerminal(func(termIn <-chan []*Record) {
for i := range termIn {
outCh <- i
}
}, node.ChannelBufferLen(limiterLen))
init.SendsTo(limiter)
limiter.SendsTo(term)
init.Start()
return inCh, outCh
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment