Skip to content
Snippets Groups Projects
Unverified Commit 6a351f2b authored by Mario Macias's avatar Mario Macias Committed by GitHub
Browse files

NETOBSERV-617: split big payloads in GRPC exporter (#81)

parent 82d7b1d5
Branches
Tags
No related merge requests found
...@@ -5,6 +5,8 @@ The following environment variables are available to configure the NetObserv eBF ...@@ -5,6 +5,8 @@ The following environment variables are available to configure the NetObserv eBF
* `EXPORT` (default: `grpc`). Flows' exporter protocol. Accepted values are: `grpc` or `kafka` or `ipfix+tcp` or `ipfix+udp`. * `EXPORT` (default: `grpc`). Flows' exporter protocol. Accepted values are: `grpc` or `kafka` or `ipfix+tcp` or `ipfix+udp`.
* `FLOWS_TARGET_HOST` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Host name or IP of the target Flow collector. * `FLOWS_TARGET_HOST` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Host name or IP of the target Flow collector.
* `FLOWS_TARGET_PORT` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Port of the target flow collector. * `FLOWS_TARGET_PORT` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Port of the target flow collector.
* `GRPC_MESSAGE_MAX_FLOWS` (default: `10000`). Specifies the limit, in number of flows, of each GRPC
message. Messages larger than that number will be split and submitted sequentially.
* `AGENT_IP` (optional). Allows overriding the reported Agent IP address on each flow. * `AGENT_IP` (optional). Allows overriding the reported Agent IP address on each flow.
* `AGENT_IP_IFACE` (default: `external`). Specifies which interface should the agent pick the IP * `AGENT_IP_IFACE` (default: `external`). Specifies which interface should the agent pick the IP
address from in order to report it in the AgentIP field on each flow. Accepted values are: address from in order to report it in the AgentIP field on each flow. Accepted values are:
......
...@@ -193,7 +193,7 @@ func buildFlowExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) { ...@@ -193,7 +193,7 @@ func buildFlowExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
cfg.TargetHost, cfg.TargetPort) cfg.TargetHost, cfg.TargetPort)
} }
target := fmt.Sprintf("%s:%d", cfg.TargetHost, cfg.TargetPort) target := fmt.Sprintf("%s:%d", cfg.TargetHost, cfg.TargetPort)
grpcExporter, err := exporter.StartGRPCProto(target) grpcExporter, err := exporter.StartGRPCProto(target, cfg.GRPCMessageMaxFlows)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -42,6 +42,9 @@ type Config struct { ...@@ -42,6 +42,9 @@ type Config struct {
TargetHost string `env:"FLOWS_TARGET_HOST"` TargetHost string `env:"FLOWS_TARGET_HOST"`
// TargetPort is the port the target Flow collector, when the EXPORT variable is set to "grpc" // TargetPort is the port the target Flow collector, when the EXPORT variable is set to "grpc"
TargetPort int `env:"FLOWS_TARGET_PORT"` TargetPort int `env:"FLOWS_TARGET_PORT"`
// GRPCMessageMaxFlows specifies the limit, in number of flows, of each GRPC message. Messages
// larger than that number will be split and submitted sequentially.
GRPCMessageMaxFlows int `env:"GRPC_MESSAGE_MAX_FLOWS" envDefault:"10000"`
// Interfaces contains the interface names from where flows will be collected. If empty, the agent // Interfaces contains the interface names from where flows will be collected. If empty, the agent
// will fetch all the interfaces in the system, excepting the ones listed in ExcludeInterfaces. // will fetch all the interfaces in the system, excepting the ones listed in ExcludeInterfaces.
// If an entry is enclosed by slashes (e.g. `/br-/`), it will match as regular expression, // If an entry is enclosed by slashes (e.g. `/br-/`), it will match as regular expression,
......
...@@ -16,16 +16,21 @@ var glog = logrus.WithField("component", "exporter/GRPCProto") ...@@ -16,16 +16,21 @@ var glog = logrus.WithField("component", "exporter/GRPCProto")
type GRPCProto struct { type GRPCProto struct {
hostPort string hostPort string
clientConn *grpc.ClientConnection clientConn *grpc.ClientConnection
// maxFlowsPerMessage limits the maximum number of flows per GRPC message.
// If a message contains more flows than this number, the GRPC message will be split into
// multiple messages.
maxFlowsPerMessage int
} }
func StartGRPCProto(hostPort string) (*GRPCProto, error) { func StartGRPCProto(hostPort string, maxFlowsPerMessage int) (*GRPCProto, error) {
clientConn, err := grpc.ConnectClient(hostPort) clientConn, err := grpc.ConnectClient(hostPort)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &GRPCProto{ return &GRPCProto{
hostPort: hostPort, hostPort: hostPort,
clientConn: clientConn, clientConn: clientConn,
maxFlowsPerMessage: maxFlowsPerMessage,
}, nil }, nil
} }
...@@ -34,10 +39,11 @@ func StartGRPCProto(hostPort string) (*GRPCProto, error) { ...@@ -34,10 +39,11 @@ func StartGRPCProto(hostPort string) (*GRPCProto, error) {
func (g *GRPCProto) ExportFlows(input <-chan []*flow.Record) { func (g *GRPCProto) ExportFlows(input <-chan []*flow.Record) {
log := glog.WithField("collector", g.hostPort) log := glog.WithField("collector", g.hostPort)
for inputRecords := range input { for inputRecords := range input {
pbRecords := flowsToPB(inputRecords) for _, pbRecords := range flowsToPB(inputRecords, g.maxFlowsPerMessage) {
log.Debugf("sending %d records", len(pbRecords.Entries)) log.Debugf("sending %d records", len(pbRecords.Entries))
if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil { if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil {
log.WithError(err).Error("couldn't send flow records to collector") log.WithError(err).Error("couldn't send flow records to collector")
}
} }
} }
if err := g.clientConn.Close(); err != nil { if err := g.clientConn.Close(); err != nil {
......
...@@ -6,6 +6,8 @@ import ( ...@@ -6,6 +6,8 @@ import (
"testing" "testing"
"time" "time"
test2 "github.com/netobserv/netobserv-ebpf-agent/pkg/test"
"github.com/mariomac/guara/pkg/test" "github.com/mariomac/guara/pkg/test"
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow" "github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/grpc" "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc"
...@@ -21,11 +23,12 @@ func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) { ...@@ -21,11 +23,12 @@ func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) {
port, err := test.FreeTCPPort() port, err := test.FreeTCPPort()
require.NoError(t, err) require.NoError(t, err)
serverOut := make(chan *pbflow.Records) serverOut := make(chan *pbflow.Records)
_, err = grpc.StartCollector(port, serverOut) coll, err := grpc.StartCollector(port, serverOut)
require.NoError(t, err) require.NoError(t, err)
defer coll.Close()
// Start GRPCProto exporter stage // Start GRPCProto exporter stage
exporter, err := StartGRPCProto(fmt.Sprintf("127.0.0.1:%d", port)) exporter, err := StartGRPCProto(fmt.Sprintf("127.0.0.1:%d", port), 1000)
require.NoError(t, err) require.NoError(t, err)
// Send some flows to the input of the exporter stage // Send some flows to the input of the exporter stage
...@@ -37,23 +40,14 @@ func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) { ...@@ -37,23 +40,14 @@ func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) {
{RawRecord: flow.RawRecord{RecordKey: flow.RecordKey{EthProtocol: flow.IPv6Type}}, {RawRecord: flow.RawRecord{RecordKey: flow.RecordKey{EthProtocol: flow.IPv6Type}},
AgentIP: net.ParseIP("8888::1111")}, AgentIP: net.ParseIP("8888::1111")},
} }
close(flows)
go exporter.ExportFlows(flows) go exporter.ExportFlows(flows)
var rs *pbflow.Records rs := test2.ReceiveTimeout(t, serverOut, timeout)
select {
case rs = <-serverOut:
case <-time.After(timeout):
require.Fail(t, "timeout waiting for flows")
}
assert.Len(t, rs.Entries, 1) assert.Len(t, rs.Entries, 1)
r := rs.Entries[0] r := rs.Entries[0]
assert.EqualValues(t, 0x0a090807, r.GetAgentIp().GetIpv4()) assert.EqualValues(t, 0x0a090807, r.GetAgentIp().GetIpv4())
select {
case rs = <-serverOut: rs = test2.ReceiveTimeout(t, serverOut, timeout)
case <-time.After(timeout):
require.Fail(t, "timeout waiting for flows")
}
assert.Len(t, rs.Entries, 1) assert.Len(t, rs.Entries, 1)
r = rs.Entries[0] r = rs.Entries[0]
assert.EqualValues(t, net.ParseIP("8888::1111"), r.GetAgentIp().GetIpv6()) assert.EqualValues(t, net.ParseIP("8888::1111"), r.GetAgentIp().GetIpv6())
...@@ -65,3 +59,45 @@ func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) { ...@@ -65,3 +59,45 @@ func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) {
//ok! //ok!
} }
} }
func TestGRPCProto_SplitLargeMessages(t *testing.T) {
// start remote ingestor
port, err := test.FreeTCPPort()
require.NoError(t, err)
serverOut := make(chan *pbflow.Records)
coll, err := grpc.StartCollector(port, serverOut)
require.NoError(t, err)
defer coll.Close()
const msgMaxLen = 10000
// Start GRPCProto exporter stage
exporter, err := StartGRPCProto(fmt.Sprintf("127.0.0.1:%d", port), msgMaxLen)
require.NoError(t, err)
// Send a message much longer than the limit length
flows := make(chan []*flow.Record, 10)
var input []*flow.Record
for i := 0; i < 25000; i++ {
input = append(input, &flow.Record{RawRecord: flow.RawRecord{RecordKey: flow.RecordKey{
EthProtocol: flow.IPv6Type,
}}, AgentIP: net.ParseIP("1111::1111"), Interface: "12345678"})
}
flows <- input
go exporter.ExportFlows(flows)
// expect that the submitted message is split in chunks no longer than msgMaxLen
rs := test2.ReceiveTimeout(t, serverOut, timeout)
assert.Len(t, rs.Entries, msgMaxLen)
rs = test2.ReceiveTimeout(t, serverOut, timeout)
assert.Len(t, rs.Entries, msgMaxLen)
rs = test2.ReceiveTimeout(t, serverOut, timeout)
assert.Len(t, rs.Entries, 5000)
// after all the operation, no more flows are sent
select {
case rs = <-serverOut:
assert.Failf(t, "shouldn't have received any flow", "Got: %#v", rs)
default:
//ok!
}
}
...@@ -11,14 +11,21 @@ import ( ...@@ -11,14 +11,21 @@ import (
// flowsToPB is an auxiliary function to convert flow records, as returned by the eBPF agent, // flowsToPB is an auxiliary function to convert flow records, as returned by the eBPF agent,
// into protobuf-encoded messages ready to be sent to the collector via GRPC // into protobuf-encoded messages ready to be sent to the collector via GRPC
func flowsToPB(inputRecords []*flow.Record) *pbflow.Records { func flowsToPB(inputRecords []*flow.Record, maxLen int) []*pbflow.Records {
entries := make([]*pbflow.Record, 0, len(inputRecords)) entries := make([]*pbflow.Record, 0, len(inputRecords))
for _, record := range inputRecords { for _, record := range inputRecords {
entries = append(entries, flowToPB(record)) entries = append(entries, flowToPB(record))
} }
return &pbflow.Records{ var records []*pbflow.Records
Entries: entries, for len(entries) > 0 {
end := len(entries)
if end > maxLen {
end = maxLen
}
records = append(records, &pbflow.Records{Entries: entries[:end]})
entries = entries[end:]
} }
return records
} }
// flowsToPB is an auxiliary function to convert a single flow record, as returned by the eBPF agent, // flowsToPB is an auxiliary function to convert a single flow record, as returned by the eBPF agent,
......
package test
import (
"testing"
"time"
)
// ReceiveTimeout returns the first received element or fails the test if nothing is received
// before the given timeout
func ReceiveTimeout[T any](t *testing.T, ch <-chan T, timeout time.Duration) T {
t.Helper()
select {
case e := <-ch:
return e
case <-time.After(timeout):
var z T
t.Fatalf("timeout while waiting %s for a %T element in channel", timeout, z)
return z
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment