Newer
Older
"github.com/gavv/monotime"
test2 "github.com/mariomac/guara/pkg/test"
Mohamed S. Mahmoud
committed
"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
var agentIP = "192.168.1.13"
const timeout = 2 * time.Second
func TestFlowsAgent_InvalidConfigs(t *testing.T) {
for _, tc := range []struct {
d string
c Config
}{{
d: "invalid export type",
c: Config{Export: "foo"},
}, {
d: "GRPC: missing host",
c: Config{Export: "grpc", TargetPort: 3333},
}, {
d: "GRPC: missing port",
c: Config{Export: "grpc", TargetHost: "flp"},
}, {
d: "Kafka: missing brokers",
c: Config{Export: "kafka"},
}} {
t.Run(tc.d, func(t *testing.T) {
_, err := FlowsAgent(&tc.c)
assert.Error(t, err)
})
}
}
Mohamed S. Mahmoud
committed
key1 = ebpf.BpfFlowId{
SrcPort: 123,
DstPort: 456,
IfIndex: 3,
Mohamed S. Mahmoud
committed
key1Dupe = ebpf.BpfFlowId{
SrcPort: 123,
DstPort: 456,
IfIndex: 4,
}
Mohamed S. Mahmoud
committed
key2 = ebpf.BpfFlowId{
SrcPort: 333,
DstPort: 532,
IfIndex: 3,
}
)
func TestFlowsAgent_Deduplication(t *testing.T) {
export := testAgent(t, &Config{
CacheActiveTimeout: 10 * time.Millisecond,
CacheMaxFlows: 100,
DeduperJustMark: false,
Deduper: DeduperFirstCome,
})
exported := export.Get(t, timeout)
Mohamed S. Mahmoud
committed
assert.Len(t, exported, 2)
Mohamed S. Mahmoud
committed
receivedKeys := map[ebpf.BpfFlowId]struct{}{}
var key1Flows []*flow.Record
for _, f := range exported {
Mohamed S. Mahmoud
committed
require.NotContains(t, receivedKeys, f.Id)
receivedKeys[f.Id] = struct{}{}
switch f.Id {
Mohamed S. Mahmoud
committed
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "foo", f.Interface)
key1Flows = append(key1Flows, f)
Mohamed S. Mahmoud
committed
case key1Dupe:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "bar", f.Interface)
key1Flows = append(key1Flows, f)
case key2:
assert.EqualValues(t, 7, f.Metrics.Packets)
assert.EqualValues(t, 33, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
}
}
assert.Lenf(t, key1Flows, 1, "only one flow should have been forwarded: %#v", key1Flows)
}
func TestFlowsAgent_DeduplicationJustMark(t *testing.T) {
export := testAgent(t, &Config{
CacheActiveTimeout: 10 * time.Millisecond,
CacheMaxFlows: 100,
DeduperJustMark: true,
Deduper: DeduperFirstCome,
})
exported := export.Get(t, timeout)
Mohamed S. Mahmoud
committed
receivedKeys := map[ebpf.BpfFlowId]struct{}{}
Mohamed S. Mahmoud
committed
assert.Len(t, exported, 3)
duplicates := 0
for _, f := range exported {
Mohamed S. Mahmoud
committed
require.NotContains(t, receivedKeys, f.Id)
receivedKeys[f.Id] = struct{}{}
switch f.Id {
Mohamed S. Mahmoud
committed
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
if f.Duplicate {
duplicates++
}
assert.Equal(t, "foo", f.Interface)
Mohamed S. Mahmoud
committed
case key1Dupe:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
if f.Duplicate {
duplicates++
}
assert.Equal(t, "bar", f.Interface)
case key2:
assert.EqualValues(t, 7, f.Metrics.Packets)
assert.EqualValues(t, 33, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
Mohamed S. Mahmoud
committed
assert.Equalf(t, 1, duplicates, "exported flows should have only one duplicate: %#v", exported)
}
func TestFlowsAgent_Deduplication_None(t *testing.T) {
export := testAgent(t, &Config{
CacheActiveTimeout: 10 * time.Millisecond,
CacheMaxFlows: 100,
Deduper: DeduperNone,
})
exported := export.Get(t, timeout)
Mohamed S. Mahmoud
committed
assert.Len(t, exported, 3)
Mohamed S. Mahmoud
committed
receivedKeys := map[ebpf.BpfFlowId]struct{}{}
var key1Flows []*flow.Record
for _, f := range exported {
Mohamed S. Mahmoud
committed
require.NotContains(t, receivedKeys, f.Id)
receivedKeys[f.Id] = struct{}{}
switch f.Id {
Mohamed S. Mahmoud
committed
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "foo", f.Interface)
key1Flows = append(key1Flows, f)
Mohamed S. Mahmoud
committed
case key1Dupe:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "bar", f.Interface)
key1Flows = append(key1Flows, f)
case key2:
assert.EqualValues(t, 7, f.Metrics.Packets)
assert.EqualValues(t, 33, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
Mohamed S. Mahmoud
committed
assert.Lenf(t, key1Flows, 2, "both key1 flows should have been forwarded: %#v", key1Flows)
func TestFlowsAgent_Decoration(t *testing.T) {
export := testAgent(t, &Config{
CacheActiveTimeout: 10 * time.Millisecond,
CacheMaxFlows: 100,
})
exported := export.Get(t, timeout)
Mohamed S. Mahmoud
committed
assert.Len(t, exported, 3)
// Tests that the decoration stage has been properly executed. It should
// add the interface name and the agent IP
for _, f := range exported {
assert.Equal(t, agentIP, f.AgentIP.String())
Mohamed S. Mahmoud
committed
switch f.Id {
case key1, key2:
assert.Equal(t, "foo", f.Interface)
default:
assert.Equal(t, "bar", f.Interface)
}
}
}
func testAgent(t *testing.T, cfg *Config) *test.ExporterFake {
Mohamed S. Mahmoud
committed
ebpfTracer := test.NewTracerFake()
export := test.NewExporterFake()
agent, err := flowsAgent(cfg,
test.SliceInformerFake{
{Name: "foo", Index: 3},
{Name: "bar", Index: 4},
Mohamed S. Mahmoud
committed
}, ebpfTracer, export.Export,
require.NoError(t, err)
go func() {
require.NoError(t, agent.Run(context.Background()))
}()
test2.Eventually(t, timeout, func(t require.TestingT) {
require.Equal(t, StatusStarted, agent.status)
})
now := uint64(monotime.Now())
Mohamed S. Mahmoud
committed
key1Metrics := []ebpf.BpfFlowMetrics{
{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000},
{Packets: 1, Bytes: 22, StartMonoTimeTs: now, EndMonoTimeTs: now + 3000},
}
key2Metrics := []ebpf.BpfFlowMetrics{
{Packets: 7, Bytes: 33, StartMonoTimeTs: now, EndMonoTimeTs: now + 2_000_000_000},
}
ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics{
key1: key1Metrics,
key1Dupe: key1Metrics,
key2: key2Metrics,