Skip to content
Snippets Groups Projects
Unverified Commit aa7838d8 authored by Mario Macias's avatar Mario Macias Committed by GitHub
Browse files

NETOBSERV-759: decorate flows with agent IP (#78)

* NETOBSERV-759: decorate flows with agent IP

only for Protocol Buffers encoding. IPFIX is still missing.

* Fix GRPC export + test
parent cb991054
Branches
Tags
No related merge requests found
Showing with 540 additions and 75 deletions
......@@ -22,7 +22,7 @@ IMG_SHA = $(IMAGE_TAG_BASE):$(BUILD_SHA)
LOCAL_GENERATOR_IMAGE ?= ebpf-generator:latest
CILIUM_EBPF_VERSION := v0.8.1
GOLANGCI_LINT_VERSION = v1.46.2
GOLANGCI_LINT_VERSION = v1.50.1
CLANG ?= clang
CFLAGS := -O2 -g -Wall -Werror $(CFLAGS)
......
......@@ -23,5 +23,7 @@ flowchart TD
DD --> |"chan []*flow.Record"| CL(flow.CapacityLimiter)
CL --> |"chan []*flow.Record"| EX("export.GRPCProto<br/>or<br/>export.KafkaProto")
CL --> |"chan []*flow.Record"| DC(flow.Decorator)
DC --> |"chan []*flow.Record"| EX("export.GRPCProto<br/>or<br/>export.KafkaProto")
```
\ No newline at end of file
......@@ -5,6 +5,14 @@ The following environment variables are available to configure the NetObserv eBF
* `EXPORT` (default: `grpc`). Flows' exporter protocol. Accepted values are: `grpc` or `kafka` or `ipfix+tcp` or `ipfix+udp`.
* `FLOWS_TARGET_HOST` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Host name or IP of the target Flow collector.
* `FLOWS_TARGET_PORT` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Port of the target flow collector.
* `AGENT_IP` (optional). Allows overriding the reported Agent IP address on each flow.
* `AGENT_IP_IFACE` (default: `external`). Specifies which interface should the agent pick the IP
address from in order to report it in the AgentIP field on each flow. Accepted values are:
`external` (default), `local`, or `name:<interface name>` (e.g. `name:eth0`). If the `AGENT_IP`
configuration property is set, this property has no effect.
* `AGENT_IP_TYPE` (default: `any`). Specifies which type of IP address (IPv4 or IPv6 or any) should
the agent report in the AgentID field of each flow. Accepted values are: `any` (default), `ipv4`,
`ipv6`. If the `AGENT_IP` configuration property is set, this property has no effect.
* `INTERFACES` (optional). Comma-separated list of the interface names from where flows will be collected. If
empty, the agent will use all the interfaces in the system, excepting the ones listed in
the `EXCLUDE_INTERFACES` variable.
......
......@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"net"
"time"
"github.com/cilium/ebpf/ringbuf"
......@@ -65,6 +66,10 @@ type Flows struct {
accounter *flow.Accounter
exporter flowExporter
// elements used to decorate flows with extra information
interfaceNamer flow.InterfaceNamer
agentIP net.IP
status Status
}
......@@ -101,6 +106,13 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
informer = ifaces.NewWatcher(cfg.BuffersLength)
}
alog.Debug("acquiring Agent IP")
agentIP, err := fetchAgentIP(cfg)
if err != nil {
return nil, fmt.Errorf("acquiring Agent IP: %w", err)
}
alog.Debug("agent IP: " + agentIP.String())
// configure selected exporter
exportFunc, err := buildFlowExporter(cfg)
if err != nil {
......@@ -119,7 +131,7 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
return nil, err
}
return flowsAgent(cfg, informer, fetcher, exportFunc)
return flowsAgent(cfg, informer, fetcher, exportFunc, agentIP)
}
// flowsAgent is a private constructor with injectable dependencies, usable for tests
......@@ -127,6 +139,7 @@ func flowsAgent(cfg *Config,
informer ifaces.Informer,
fetcher ebpfFlowFetcher,
exporter flowExporter,
agentIP net.IP,
) (*Flows, error) {
// configure allow/deny interfaces filter
filter, err := initInterfaceFilter(cfg.Interfaces, cfg.ExcludeInterfaces)
......@@ -144,19 +157,21 @@ func flowsAgent(cfg *Config,
return iface
}
mapTracer := flow.NewMapTracer(fetcher, interfaceNamer, cfg.CacheActiveTimeout)
mapTracer := flow.NewMapTracer(fetcher, cfg.CacheActiveTimeout)
rbTracer := flow.NewRingBufTracer(fetcher, mapTracer, cfg.CacheActiveTimeout)
accounter := flow.NewAccounter(
cfg.CacheMaxFlows, cfg.CacheActiveTimeout, interfaceNamer, time.Now, monotime.Now)
cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now)
return &Flows{
ebpf: fetcher,
exporter: exporter,
interfaces: registerer,
filter: filter,
cfg: cfg,
mapTracer: mapTracer,
rbTracer: rbTracer,
accounter: accounter,
ebpf: fetcher,
exporter: exporter,
interfaces: registerer,
filter: filter,
cfg: cfg,
mapTracer: mapTracer,
rbTracer: rbTracer,
accounter: accounter,
agentIP: agentIP,
interfaceNamer: interfaceNamer,
}, nil
}
......@@ -345,6 +360,9 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal, erro
limiter := node.AsMiddle((&flow.CapacityLimiter{}).Limit,
node.ChannelBufferLen(f.cfg.BuffersLength))
decorator := node.AsMiddle(flow.Decorate(f.agentIP, f.interfaceNamer),
node.ChannelBufferLen(f.cfg.BuffersLength))
ebl := f.cfg.ExporterBufferLength
if ebl == 0 {
ebl = f.cfg.BuffersLength
......@@ -365,7 +383,9 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal, erro
mapTracer.SendsTo(limiter)
accounter.SendsTo(limiter)
}
limiter.SendsTo(export)
limiter.SendsTo(decorator)
decorator.SendsTo(export)
alog.Debug("starting graph")
mapTracer.Start()
rbTracer.Start()
......
......@@ -2,6 +2,7 @@ package agent
import (
"context"
"net"
"testing"
"time"
......@@ -13,6 +14,8 @@ import (
"github.com/stretchr/testify/require"
)
var agentIP = "192.168.1.13"
const timeout = 2 * time.Second
func TestFlowsAgent_InvalidConfigs(t *testing.T) {
......@@ -171,6 +174,28 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) {
assert.Lenf(t, key1Flows, 2, "both key1 flows should have been forwarded: %#v", key1Flows)
}
func TestFlowsAgent_Decoration(t *testing.T) {
export := testAgent(t, &Config{
CacheActiveTimeout: 10 * time.Millisecond,
CacheMaxFlows: 100,
})
exported := export.Get(t, timeout)
assert.Len(t, exported, 3)
// Tests that the decoration stage has been properly executed. It should
// add the interface name and the agent IP
for _, f := range exported {
assert.Equal(t, agentIP, f.AgentIP.String())
switch f.RecordKey {
case key1, key2:
assert.Equal(t, "foo", f.Interface)
default:
assert.Equal(t, "bar", f.Interface)
}
}
}
func testAgent(t *testing.T, cfg *Config) *test.ExporterFake {
ebpf := test.NewTracerFake()
export := test.NewExporterFake()
......@@ -178,7 +203,8 @@ func testAgent(t *testing.T, cfg *Config) *test.ExporterFake {
test.SliceInformerFake{
{Name: "foo", Index: 3},
{Name: "bar", Index: 4},
}, ebpf, export.Export)
}, ebpf, export.Export,
net.ParseIP(agentIP))
require.NoError(t, err)
go func() {
......
......@@ -12,9 +12,28 @@ const (
DirectionIngress = "ingress"
DirectionEgress = "egress"
DirectionBoth = "both"
IPTypeAny = "any"
IPTypeIPV4 = "ipv4"
IPTypeIPV6 = "ipv6"
IPIfaceExternal = "external"
IPIfaceLocal = "local"
IPIfaceNamedPrefix = "name:"
)
type Config struct {
// AgentIP allows overriding the reported Agent IP address on each flow.
AgentIP string `env:"AGENT_IP"`
// AgentIPIface specifies which interface should the agent pick the IP address from in order to
// report it in the AgentIP field on each flow. Accepted values are: external (default), local,
// or name:<interface name> (e.g. name:eth0).
// If the AgentIP configuration property is set, this property has no effect.
AgentIPIface string `env:"AGENT_IP_IFACE" envDefault:"external"`
// AgentIPType specifies which type of IP address (IPv4 or IPv6 or any) should the agent report
// in the AgentID field of each flow. Accepted values are: any (default), ipv4, ipv6.
// If the AgentIP configuration property is set, this property has no effect.
AgentIPType string `env:"AGENT_IP_TYPE" envDefault:"any"`
// Export selects the flows' exporter protocol. Accepted values are: grpc (default) or kafka
// or ipfix+udp or ipfix+tcp.
Export string `env:"EXPORT" envDefault:"grpc"`
......
package agent
import (
"fmt"
"net"
"strings"
)
// dependencies that can be injected from testing
var (
interfaceByName = net.InterfaceByName
interfaceAddrs = net.InterfaceAddrs
dial = net.Dial
ifaceAddrs = func(iface *net.Interface) ([]net.Addr, error) {
return iface.Addrs()
}
)
// fetchAgentIP guesses the non-loopback IP address of the Agent host, according to the
// user-provided configuration:
// - If AgentIP is provided, this value is used whatever is the real IP of the Agent.
// - AgentIPIface specifies which interface this function should look into in order to pickup an address.
// - AgentIPType specifies which type of IP address should the agent pickup ("any" to pickup whichever
// ipv4 or ipv6 address is found first)
func fetchAgentIP(cfg *Config) (net.IP, error) {
if cfg.AgentIP != "" {
if ip := net.ParseIP(cfg.AgentIP); ip != nil {
return ip, nil
}
return nil, fmt.Errorf("can't parse provided IP %v", cfg.AgentIP)
}
if cfg.AgentIPType != IPTypeAny &&
cfg.AgentIPType != IPTypeIPV6 &&
cfg.AgentIPType != IPTypeIPV4 {
return nil, fmt.Errorf("invalid IP type %q. Valid values are: %s, %s or %s",
cfg.AgentIPType, IPTypeIPV4, IPTypeIPV6, IPTypeAny)
}
switch cfg.AgentIPIface {
case IPIfaceLocal:
return fromLocal(cfg.AgentIPType)
case IPIfaceExternal:
return fromExternal(cfg.AgentIPType)
default:
if !strings.HasPrefix(cfg.AgentIPIface, IPIfaceNamedPrefix) {
return nil, fmt.Errorf(
"invalid IP interface %q. Valid values are: %s, %s or %s<iface_name>",
cfg.AgentIPIface, IPIfaceLocal, IPIfaceExternal, IPIfaceNamedPrefix)
}
return fromInterface(cfg.AgentIPIface[len(IPIfaceNamedPrefix):], cfg.AgentIPType)
}
}
func fromInterface(ifName, ipType string) (net.IP, error) {
iface, err := interfaceByName(ifName)
if err != nil {
return nil, err
}
addrs, err := ifaceAddrs(iface)
if err != nil {
return nil, err
}
if ip, ok := findAddress(addrs, ipType); ok {
return ip, nil
}
return nil, fmt.Errorf("no matching %q addresses found at interface %v", ipType, ifName)
}
func fromLocal(ipType string) (net.IP, error) {
addrs, err := interfaceAddrs()
if err != nil {
return nil, err
}
if ip, ok := findAddress(addrs, ipType); ok {
return ip, nil
}
return nil, fmt.Errorf("no matching local %q addresses found", ipType)
}
func fromExternal(ipType string) (net.IP, error) {
// We don't really care about the existence or nonexistence of the addresses.
// This will just establish an external dialer where we can pickup the external
// host address
addrStr := "8.8.8.8:80"
if ipType == IPTypeIPV6 {
addrStr = "[2001:4860:4860::8888]:80"
}
conn, err := dial("udp", addrStr)
if err != nil {
return nil, fmt.Errorf("can't establish an external connection %w", err)
}
if addr, ok := conn.LocalAddr().(*net.UDPAddr); !ok {
return nil, fmt.Errorf("unexpected local address type %T for external connection",
conn.LocalAddr())
} else if ip, ok := getIP(addr.IP, ipType); ok {
return ip, nil
}
return nil, fmt.Errorf("no matching %q external addresses found", ipType)
}
func findAddress(addrs []net.Addr, ipType string) (net.IP, bool) {
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && ipnet != nil {
if ip, ok := getIP(ipnet.IP, ipType); ok {
return ip, true
}
}
}
return nil, false
}
func getIP(pip net.IP, ipType string) (net.IP, bool) {
if pip == nil || pip.IsLoopback() {
return nil, false
}
switch ipType {
case IPTypeIPV4:
if ip := pip.To4(); ip != nil {
return ip, true
}
case IPTypeIPV6:
// as any IP4 address can be converted to IP6, we only return any
// address that can be converted to IP6 but not to IP4
if ip := pip.To16(); ip != nil && pip.To4() == nil {
return ip, true
}
default: // Any
if ip := pip.To4(); ip != nil {
return ip, true
}
if ip := pip.To16(); ip != nil {
return ip, true
}
}
return nil, false
}
package agent
import (
"errors"
"net"
"regexp"
"testing"
"time"
"github.com/stretchr/testify/require"
)
var (
localIP4 = net.ParseIP("10.0.0.10")
localIP6 = net.ParseIP("2001:0db8::1111")
externalIP4 = net.ParseIP("84.88.89.90")
externalIP6 = net.ParseIP("2001:0db8::eeee")
testIFName = "teth1"
testIfIP4 = net.ParseIP("10.1.2.3")
testIfIP6 = net.ParseIP("2001:0db8::6666")
testIFName2 = "teth2"
testIf2IP6 = net.ParseIP("2001:0db8::6262")
)
func TestAgentIP_Any(t *testing.T) {
mockIfaces()
type testCase struct {
dsc string
cfg Config
expect net.IP
}
for _, tc := range []testCase{
{dsc: "hardcoding IPv4 address",
cfg: Config{AgentIP: "192.168.1.13"},
expect: net.IPv4(192, 168, 1, 13)},
{dsc: "hardcoding IPv6 address",
cfg: Config{AgentIP: "2002:0db9::7336"},
expect: net.ParseIP("2002:0db9::7336")},
{dsc: "any local address",
cfg: Config{AgentIPIface: "local", AgentIPType: "any"},
expect: localIP4},
{dsc: "local IPv4 address",
cfg: Config{AgentIPIface: "local", AgentIPType: "ipv4"},
expect: localIP4},
{dsc: "local IPv6 address",
cfg: Config{AgentIPIface: "local", AgentIPType: "ipv6"},
expect: localIP6},
{dsc: "any external address",
cfg: Config{AgentIPIface: "external", AgentIPType: "any"},
expect: externalIP4},
{dsc: "external IPv4 address",
cfg: Config{AgentIPIface: "external", AgentIPType: "ipv4"},
expect: externalIP4},
{dsc: "external IPv6 address",
cfg: Config{AgentIPIface: "external", AgentIPType: "ipv6"},
expect: externalIP6},
{dsc: "any IP given an interface name",
cfg: Config{AgentIPIface: "name:" + testIFName, AgentIPType: "any"},
expect: testIfIP4},
{dsc: "IPv4 address given an interface name",
cfg: Config{AgentIPIface: "name:" + testIFName, AgentIPType: "ipv4"},
expect: testIfIP4},
{dsc: "IPv6 address given an interface name",
cfg: Config{AgentIPIface: "name:" + testIFName, AgentIPType: "ipv6"},
expect: testIfIP6},
{dsc: "any IP given an IPV6-only interface name",
cfg: Config{AgentIPIface: "name:" + testIFName2, AgentIPType: "any"},
expect: testIf2IP6},
{dsc: "IPv6 address given an IPV6-only interface name",
cfg: Config{AgentIPIface: "name:" + testIFName2, AgentIPType: "ipv6"},
expect: testIf2IP6},
} {
t.Run(tc.dsc, func(t *testing.T) {
ip, err := fetchAgentIP(&tc.cfg)
require.NoError(t, err)
require.Truef(t, tc.expect.Equal(ip), "expected: %s. Got: %s", tc.expect, ip)
})
}
}
func mockIfaces() {
// mock local addresses retrieval
interfaceAddrs = func() ([]net.Addr, error) {
return []net.Addr{
&net.IPNet{IP: net.ParseIP("127.0.0.1")},
&net.IPNet{IP: localIP4},
&net.IPNet{IP: localIP6},
}, nil
}
// mock external address retrieval
dial = func(_, address string) (net.Conn, error) {
// IPv4 address
if regexp.MustCompile(`^\d+(\.\d+){3}(:\d+)?$`).MatchString(address) {
return &connMock{ip: externalIP4}, nil
}
return &connMock{ip: externalIP6}, nil
}
// mock interface retrieval by name
interfaceByName = func(name string) (*net.Interface, error) {
if name != testIFName && name != testIFName2 {
return nil, errors.New("unknown interface " + name)
}
return &net.Interface{
Name: name,
}, nil
}
// mock test interface address retrieval
ifaceAddrs = func(iface *net.Interface) ([]net.Addr, error) {
switch iface.Name {
case testIFName:
return []net.Addr{
&net.IPNet{IP: testIfIP4},
&net.IPNet{IP: testIfIP6},
}, nil
case testIFName2:
return []net.Addr{
&net.IPNet{IP: testIf2IP6},
}, nil
}
return iface.Addrs()
}
}
type connMock struct {
ip net.IP
}
func (c *connMock) LocalAddr() net.Addr {
return &net.UDPAddr{IP: c.ip}
}
func (c *connMock) Read(_ []byte) (n int, err error) { panic("unexpected call") }
func (c *connMock) Write(_ []byte) (n int, err error) { panic("unexpected call") }
func (c *connMock) Close() error { panic("unexpected call") }
func (c *connMock) RemoteAddr() net.Addr { panic("unexpected call") }
func (c *connMock) SetDeadline(_ time.Time) error { panic("unexpected call") }
func (c *connMock) SetReadDeadline(_ time.Time) error { panic("unexpected call") }
func (c *connMock) SetWriteDeadline(_ time.Time) error { panic("unexpected call") }
package exporter
import (
"fmt"
"net"
"testing"
"time"
"github.com/mariomac/guara/pkg/test"
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/grpc"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const timeout = 2 * time.Second
func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) {
// start remote ingestor
port, err := test.FreeTCPPort()
require.NoError(t, err)
serverOut := make(chan *pbflow.Records)
_, err = grpc.StartCollector(port, serverOut)
require.NoError(t, err)
// Start GRPCProto exporter stage
exporter, err := StartGRPCProto(fmt.Sprintf("127.0.0.1:%d", port))
require.NoError(t, err)
// Send some flows to the input of the exporter stage
flows := make(chan []*flow.Record, 10)
flows <- []*flow.Record{
{AgentIP: net.ParseIP("10.9.8.7")},
}
flows <- []*flow.Record{
{RawRecord: flow.RawRecord{RecordKey: flow.RecordKey{EthProtocol: flow.IPv6Type}},
AgentIP: net.ParseIP("8888::1111")},
}
close(flows)
go exporter.ExportFlows(flows)
var rs *pbflow.Records
select {
case rs = <-serverOut:
case <-time.After(timeout):
require.Fail(t, "timeout waiting for flows")
}
assert.Len(t, rs.Entries, 1)
r := rs.Entries[0]
assert.EqualValues(t, 0x0a090807, r.GetAgentIp().GetIpv4())
select {
case rs = <-serverOut:
case <-time.After(timeout):
require.Fail(t, "timeout waiting for flows")
}
assert.Len(t, rs.Entries, 1)
r = rs.Entries[0]
assert.EqualValues(t, net.ParseIP("8888::1111"), r.GetAgentIp().GetIpv6())
select {
case rs = <-serverOut:
assert.Failf(t, "shouldn't have received any flow", "Got: %#v", rs)
default:
//ok!
}
}
......@@ -12,6 +12,9 @@ import (
var ilog = logrus.WithField("component", "exporter/IPFIXProto")
// TODO: encode also the equivalent of the Protobuf's AgentIP field in a format that is binary-
// compatible with OVN-K.
type IPFIX struct {
hostPort string
exporter *ipfixExporter.ExportingProcess
......
package exporter
import (
"encoding/binary"
"net"
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
"google.golang.org/protobuf/types/known/timestamppb"
......@@ -56,6 +59,7 @@ func v4FlowToPB(fr *flow.Record) *pbflow.Record {
Packets: uint64(fr.Packets),
Interface: fr.Interface,
Duplicate: fr.Duplicate,
AgentIp: agentIP(fr.AgentIP),
}
}
......@@ -88,6 +92,7 @@ func v6FlowToPB(fr *flow.Record) *pbflow.Record {
Packets: uint64(fr.Packets),
Interface: fr.Interface,
Duplicate: fr.Duplicate,
AgentIp: agentIP(fr.AgentIP),
}
}
......@@ -101,3 +106,11 @@ func macToUint64(m *flow.MacAddr) uint64 {
(uint64(m[1]) << 32) |
(uint64(m[0]) << 40)
}
func agentIP(nip net.IP) *pbflow.IP {
if ip := nip.To4(); ip != nil {
return &pbflow.IP{IpFamily: &pbflow.IP_Ipv4{Ipv4: binary.BigEndian.Uint32(ip)}}
}
// IPv6 address
return &pbflow.IP{IpFamily: &pbflow.IP_Ipv6{Ipv6: nip}}
}
......@@ -16,7 +16,6 @@ type Accounter struct {
entries map[RecordKey]*RecordMetrics
clock func() time.Time
monoClock func() time.Duration
namer InterfaceNamer
}
var alog = logrus.WithField("component", "flow/Accounter")
......@@ -24,7 +23,7 @@ var alog = logrus.WithField("component", "flow/Accounter")
// NewAccounter creates a new Accounter.
// The cache has no limit and it's assumed that eviction is done by the caller.
func NewAccounter(
maxEntries int, evictTimeout time.Duration, ifaceNamer InterfaceNamer,
maxEntries int, evictTimeout time.Duration,
clock func() time.Time,
monoClock func() time.Duration,
) *Accounter {
......@@ -32,7 +31,6 @@ func NewAccounter(
maxEntries: maxEntries,
evictTimeout: evictTimeout,
entries: map[RecordKey]*RecordMetrics{},
namer: ifaceNamer,
clock: clock,
monoClock: monoClock,
}
......@@ -86,7 +84,7 @@ func (c *Accounter) evict(entries map[RecordKey]*RecordMetrics, evictor chan<- [
monotonicNow := uint64(c.monoClock())
records := make([]*Record, 0, len(entries))
for key, metrics := range entries {
records = append(records, NewRecord(key, *metrics, now, monotonicNow, c.namer))
records = append(records, NewRecord(key, *metrics, now, monotonicNow))
}
alog.WithField("numEntries", len(records)).Debug("records evicted from userspace accounter")
evictor <- records
......
......@@ -37,9 +37,7 @@ var k3 = RecordKey{
func TestEvict_MaxEntries(t *testing.T) {
// GIVEN an accounter
now := time.Date(2022, 8, 23, 16, 33, 22, 0, time.UTC)
acc := NewAccounter(2, time.Hour, func(ifIndex int) string {
return "foo"
}, func() time.Time {
acc := NewAccounter(2, time.Hour, func() time.Time {
return now
}, func() time.Duration {
return 1000
......@@ -103,7 +101,6 @@ func TestEvict_MaxEntries(t *testing.T) {
},
TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond),
TimeFlowEnd: now.Add(-(1000 - 789) * time.Nanosecond),
Interface: "foo",
},
k2: {
RawRecord: RawRecord{
......@@ -114,7 +111,6 @@ func TestEvict_MaxEntries(t *testing.T) {
},
TimeFlowStart: now.Add(-(1000 - 456) * time.Nanosecond),
TimeFlowEnd: now.Add(-(1000 - 456) * time.Nanosecond),
Interface: "foo",
},
}, received)
}
......@@ -122,9 +118,7 @@ func TestEvict_MaxEntries(t *testing.T) {
func TestEvict_Period(t *testing.T) {
// GIVEN an accounter
now := time.Date(2022, 8, 23, 16, 33, 22, 0, time.UTC)
acc := NewAccounter(200, 20*time.Millisecond, func(ifIndex int) string {
return "foo"
}, func() time.Time {
acc := NewAccounter(200, 20*time.Millisecond, func() time.Time {
return now
}, func() time.Duration {
return 1000
......@@ -182,7 +176,6 @@ func TestEvict_Period(t *testing.T) {
EndMonoTimeNs: 789,
},
},
Interface: "foo",
TimeFlowStart: now.Add(-1000 + 123),
TimeFlowEnd: now.Add(-1000 + 789),
}, *records[0])
......@@ -198,7 +191,6 @@ func TestEvict_Period(t *testing.T) {
EndMonoTimeNs: 1456,
},
},
Interface: "foo",
TimeFlowStart: now.Add(-1000 + 1123),
TimeFlowEnd: now.Add(-1000 + 1456),
}, *records[0])
......
package flow
import (
"net"
)
type InterfaceNamer func(ifIndex int) string
// Decorate adds to the flows extra metadata fields that are not directly fetched by eBPF:
// - The interface name (corresponding to the interface index in the flow).
// - The IP address of the agent host.
func Decorate(agentIP net.IP, ifaceNamer InterfaceNamer) func(in <-chan []*Record, out chan<- []*Record) {
return func(in <-chan []*Record, out chan<- []*Record) {
for flows := range in {
for _, flow := range flows {
flow.Interface = ifaceNamer(int(flow.IFIndex))
flow.AgentIP = agentIP
}
out <- flows
}
}
}
......@@ -91,16 +91,16 @@ type Record struct {
// "exclude from aggregation". Otherwise rates, sums, etc... values would be multiplied by the
// number of interfaces this flow is observed from.
Duplicate bool
}
type InterfaceNamer func(ifIndex int) string
// AgentIP provides information about the source of the flow (the Agent that traced it)
AgentIP net.IP
}
func NewRecord(
key RecordKey,
metrics RecordMetrics,
currentTime time.Time,
monotonicCurrentTime uint64,
namer InterfaceNamer,
) *Record {
startDelta := time.Duration(monotonicCurrentTime - metrics.StartMonoTimeNs)
endDelta := time.Duration(monotonicCurrentTime - metrics.EndMonoTimeNs)
......@@ -109,7 +109,6 @@ func NewRecord(
RecordKey: key,
RecordMetrics: metrics,
},
Interface: namer(int(key.IFIndex)),
TimeFlowStart: currentTime.Add(-startDelta),
TimeFlowEnd: currentTime.Add(-endDelta),
}
......
......@@ -16,7 +16,6 @@ var mtlog = logrus.WithField("component", "flow.MapTracer")
// a flow Record structure, and performs the accumulation of each perCPU-record into a single flow
type MapTracer struct {
mapFetcher mapFetcher
interfaceNamer InterfaceNamer
evictionTimeout time.Duration
// manages the access to the eviction routines, avoiding two evictions happening at the same time
evictionCond *sync.Cond
......@@ -27,10 +26,9 @@ type mapFetcher interface {
LookupAndDeleteMap() map[RecordKey][]RecordMetrics
}
func NewMapTracer(fetcher mapFetcher, namer InterfaceNamer, evictionTimeout time.Duration) *MapTracer {
func NewMapTracer(fetcher mapFetcher, evictionTimeout time.Duration) *MapTracer {
return &MapTracer{
mapFetcher: fetcher,
interfaceNamer: namer,
evictionTimeout: evictionTimeout,
lastEvictionNs: uint64(monotime.Now()),
evictionCond: sync.NewCond(&sync.Mutex{}),
......@@ -108,7 +106,6 @@ func (m *MapTracer) evictFlows(forwardFlows chan<- []*Record) {
aggregatedMetrics,
currentTime,
uint64(monotonicTimeNow),
m.interfaceNamer,
))
}
m.lastEvictionNs = laterFlowNs
......
......@@ -36,6 +36,8 @@ func TestGRPCCommunication(t *testing.T) {
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x55667788},
},
}, AgentIp: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0xaabbccdd},
}}},
})
require.NoError(t, err)
......@@ -48,6 +50,8 @@ func TestGRPCCommunication(t *testing.T) {
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x88776655},
},
}, AgentIp: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0xddccbbaa},
}}},
})
require.NoError(t, err)
......@@ -65,6 +69,7 @@ func TestGRPCCommunication(t *testing.T) {
assert.EqualValues(t, 456, r.Bytes)
assert.EqualValues(t, 0x11223344, r.GetNetwork().GetSrcAddr().GetIpv4())
assert.EqualValues(t, 0x55667788, r.GetNetwork().GetDstAddr().GetIpv4())
assert.EqualValues(t, 0xaabbccdd, r.GetAgentIp().GetIpv4())
select {
case rs = <-serverOut:
case <-time.After(timeout):
......@@ -76,6 +81,7 @@ func TestGRPCCommunication(t *testing.T) {
assert.EqualValues(t, 101, r.Bytes)
assert.EqualValues(t, 0x44332211, r.GetNetwork().GetSrcAddr().GetIpv4())
assert.EqualValues(t, 0x88776655, r.GetNetwork().GetDstAddr().GetIpv4())
assert.EqualValues(t, 0xddccbbaa, r.GetAgentIp().GetIpv4())
select {
case rs = <-serverOut:
......
......@@ -176,6 +176,8 @@ type Record struct {
// if true, the same flow has been recorded via another interface.
// From all the duplicate flows, one will set this value to false and the rest will be true.
Duplicate bool `protobuf:"varint,11,opt,name=duplicate,proto3" json:"duplicate,omitempty"`
// Agent IP address to help identifying the source of the flow
AgentIp *IP `protobuf:"bytes,12,opt,name=agent_ip,json=agentIp,proto3" json:"agent_ip,omitempty"`
}
func (x *Record) Reset() {
......@@ -287,6 +289,13 @@ func (x *Record) GetDuplicate() bool {
return false
}
func (x *Record) GetAgentIp() *IP {
if x != nil {
return x.AgentIp
}
return nil
}
type DataLink struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
......@@ -553,7 +562,7 @@ var file_proto_flow_proto_rawDesc = []byte{
0x07, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x12, 0x28, 0x0a, 0x07, 0x65, 0x6e, 0x74, 0x72,
0x69, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x70, 0x62, 0x66, 0x6c,
0x6f, 0x77, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x07, 0x65, 0x6e, 0x74, 0x72, 0x69,
0x65, 0x73, 0x22, 0xd7, 0x03, 0x0a, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x21, 0x0a,
0x65, 0x73, 0x22, 0xfe, 0x03, 0x0a, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x21, 0x0a,
0x0c, 0x65, 0x74, 0x68, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x01, 0x20,
0x01, 0x28, 0x0d, 0x52, 0x0b, 0x65, 0x74, 0x68, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c,
0x12, 0x2f, 0x0a, 0x09, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20,
......@@ -582,34 +591,37 @@ var file_proto_flow_proto_rawDesc = []byte{
0x12, 0x1c, 0x0a, 0x09, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x66, 0x61, 0x63, 0x65, 0x18, 0x0a, 0x20,
0x01, 0x28, 0x09, 0x52, 0x09, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x66, 0x61, 0x63, 0x65, 0x12, 0x1c,
0x0a, 0x09, 0x64, 0x75, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x18, 0x0b, 0x20, 0x01, 0x28,
0x08, 0x52, 0x09, 0x64, 0x75, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x22, 0x3c, 0x0a, 0x08,
0x44, 0x61, 0x74, 0x61, 0x4c, 0x69, 0x6e, 0x6b, 0x12, 0x17, 0x0a, 0x07, 0x73, 0x72, 0x63, 0x5f,
0x6d, 0x61, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x73, 0x72, 0x63, 0x4d, 0x61,
0x63, 0x12, 0x17, 0x0a, 0x07, 0x64, 0x73, 0x74, 0x5f, 0x6d, 0x61, 0x63, 0x18, 0x02, 0x20, 0x01,
0x28, 0x04, 0x52, 0x06, 0x64, 0x73, 0x74, 0x4d, 0x61, 0x63, 0x22, 0x57, 0x0a, 0x07, 0x4e, 0x65,
0x74, 0x77, 0x6f, 0x72, 0x6b, 0x12, 0x25, 0x0a, 0x08, 0x73, 0x72, 0x63, 0x5f, 0x61, 0x64, 0x64,
0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77,
0x2e, 0x49, 0x50, 0x52, 0x07, 0x73, 0x72, 0x63, 0x41, 0x64, 0x64, 0x72, 0x12, 0x25, 0x0a, 0x08,
0x64, 0x73, 0x74, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a,
0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, 0x50, 0x52, 0x07, 0x64, 0x73, 0x74, 0x41,
0x64, 0x64, 0x72, 0x22, 0x3d, 0x0a, 0x02, 0x49, 0x50, 0x12, 0x14, 0x0a, 0x04, 0x69, 0x70, 0x76,
0x34, 0x18, 0x01, 0x20, 0x01, 0x28, 0x07, 0x48, 0x00, 0x52, 0x04, 0x69, 0x70, 0x76, 0x34, 0x12,
0x14, 0x0a, 0x04, 0x69, 0x70, 0x76, 0x36, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52,
0x04, 0x69, 0x70, 0x76, 0x36, 0x42, 0x0b, 0x0a, 0x09, 0x69, 0x70, 0x5f, 0x66, 0x61, 0x6d, 0x69,
0x6c, 0x79, 0x22, 0x5d, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x12,
0x19, 0x0a, 0x08, 0x73, 0x72, 0x63, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28,
0x0d, 0x52, 0x07, 0x73, 0x72, 0x63, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x64, 0x73,
0x74, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x64, 0x73,
0x74, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f,
0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f,
0x6c, 0x2a, 0x24, 0x0a, 0x09, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0b,
0x0a, 0x07, 0x49, 0x4e, 0x47, 0x52, 0x45, 0x53, 0x53, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x45,
0x47, 0x52, 0x45, 0x53, 0x53, 0x10, 0x01, 0x32, 0x3e, 0x0a, 0x09, 0x43, 0x6f, 0x6c, 0x6c, 0x65,
0x63, 0x74, 0x6f, 0x72, 0x12, 0x31, 0x0a, 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x0f, 0x2e, 0x70,
0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x1a, 0x16, 0x2e,
0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72,
0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f, 0x70, 0x62, 0x66,
0x6c, 0x6f, 0x77, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x08, 0x52, 0x09, 0x64, 0x75, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x08,
0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x70, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a,
0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, 0x50, 0x52, 0x07, 0x61, 0x67, 0x65, 0x6e,
0x74, 0x49, 0x70, 0x22, 0x3c, 0x0a, 0x08, 0x44, 0x61, 0x74, 0x61, 0x4c, 0x69, 0x6e, 0x6b, 0x12,
0x17, 0x0a, 0x07, 0x73, 0x72, 0x63, 0x5f, 0x6d, 0x61, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04,
0x52, 0x06, 0x73, 0x72, 0x63, 0x4d, 0x61, 0x63, 0x12, 0x17, 0x0a, 0x07, 0x64, 0x73, 0x74, 0x5f,
0x6d, 0x61, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x64, 0x73, 0x74, 0x4d, 0x61,
0x63, 0x22, 0x57, 0x0a, 0x07, 0x4e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x12, 0x25, 0x0a, 0x08,
0x73, 0x72, 0x63, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a,
0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, 0x50, 0x52, 0x07, 0x73, 0x72, 0x63, 0x41,
0x64, 0x64, 0x72, 0x12, 0x25, 0x0a, 0x08, 0x64, 0x73, 0x74, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18,
0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49,
0x50, 0x52, 0x07, 0x64, 0x73, 0x74, 0x41, 0x64, 0x64, 0x72, 0x22, 0x3d, 0x0a, 0x02, 0x49, 0x50,
0x12, 0x14, 0x0a, 0x04, 0x69, 0x70, 0x76, 0x34, 0x18, 0x01, 0x20, 0x01, 0x28, 0x07, 0x48, 0x00,
0x52, 0x04, 0x69, 0x70, 0x76, 0x34, 0x12, 0x14, 0x0a, 0x04, 0x69, 0x70, 0x76, 0x36, 0x18, 0x02,
0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x04, 0x69, 0x70, 0x76, 0x36, 0x42, 0x0b, 0x0a, 0x09,
0x69, 0x70, 0x5f, 0x66, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x22, 0x5d, 0x0a, 0x09, 0x54, 0x72, 0x61,
0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x73, 0x72, 0x63, 0x5f, 0x70, 0x6f,
0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x73, 0x72, 0x63, 0x50, 0x6f, 0x72,
0x74, 0x12, 0x19, 0x0a, 0x08, 0x64, 0x73, 0x74, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20,
0x01, 0x28, 0x0d, 0x52, 0x07, 0x64, 0x73, 0x74, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x1a, 0x0a, 0x08,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2a, 0x24, 0x0a, 0x09, 0x44, 0x69, 0x72, 0x65,
0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0b, 0x0a, 0x07, 0x49, 0x4e, 0x47, 0x52, 0x45, 0x53, 0x53,
0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x45, 0x47, 0x52, 0x45, 0x53, 0x53, 0x10, 0x01, 0x32, 0x3e,
0x0a, 0x09, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x12, 0x31, 0x0a, 0x04, 0x53,
0x65, 0x6e, 0x64, 0x12, 0x0f, 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x52, 0x65, 0x63,
0x6f, 0x72, 0x64, 0x73, 0x1a, 0x16, 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x43, 0x6f,
0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x0a,
0x5a, 0x08, 0x2e, 0x2f, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
}
var (
......@@ -645,15 +657,16 @@ var file_proto_flow_proto_depIdxs = []int32{
4, // 4: pbflow.Record.data_link:type_name -> pbflow.DataLink
5, // 5: pbflow.Record.network:type_name -> pbflow.Network
7, // 6: pbflow.Record.transport:type_name -> pbflow.Transport
6, // 7: pbflow.Network.src_addr:type_name -> pbflow.IP
6, // 8: pbflow.Network.dst_addr:type_name -> pbflow.IP
2, // 9: pbflow.Collector.Send:input_type -> pbflow.Records
1, // 10: pbflow.Collector.Send:output_type -> pbflow.CollectorReply
10, // [10:11] is the sub-list for method output_type
9, // [9:10] is the sub-list for method input_type
9, // [9:9] is the sub-list for extension type_name
9, // [9:9] is the sub-list for extension extendee
0, // [0:9] is the sub-list for field type_name
6, // 7: pbflow.Record.agent_ip:type_name -> pbflow.IP
6, // 8: pbflow.Network.src_addr:type_name -> pbflow.IP
6, // 9: pbflow.Network.dst_addr:type_name -> pbflow.IP
2, // 10: pbflow.Collector.Send:input_type -> pbflow.Records
1, // 11: pbflow.Collector.Send:output_type -> pbflow.CollectorReply
11, // [11:12] is the sub-list for method output_type
10, // [10:11] is the sub-list for method input_type
10, // [10:10] is the sub-list for extension type_name
10, // [10:10] is the sub-list for extension extendee
0, // [0:10] is the sub-list for field type_name
}
func init() { file_proto_flow_proto_init() }
......
......@@ -26,6 +26,7 @@ func (ef *ExporterFake) Export(in <-chan []*flow.Record) {
}
func (ef *ExporterFake) Get(t *testing.T, timeout time.Duration) []*flow.Record {
t.Helper()
select {
case <-time.After(timeout):
t.Fatalf("timeout %s while waiting for a message to be exported", timeout)
......
......@@ -37,6 +37,9 @@ message Record {
// if true, the same flow has been recorded via another interface.
// From all the duplicate flows, one will set this value to false and the rest will be true.
bool duplicate = 11;
// Agent IP address to help identifying the source of the flow
IP agent_ip = 12;
}
message DataLink {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment