diff --git a/docs/config.md b/docs/config.md index 5fec13f3ce054057bd23f44ec0cbb8f289e39326..22827e9c92becf09238aaf92d19c3c134b5996e9 100644 --- a/docs/config.md +++ b/docs/config.md @@ -5,6 +5,8 @@ The following environment variables are available to configure the NetObserv eBF * `EXPORT` (default: `grpc`). Flows' exporter protocol. Accepted values are: `grpc` or `kafka` or `ipfix+tcp` or `ipfix+udp`. * `FLOWS_TARGET_HOST` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Host name or IP of the target Flow collector. * `FLOWS_TARGET_PORT` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Port of the target flow collector. +* `GRPC_MESSAGE_MAX_FLOWS` (default: `10000`). Specifies the limit, in number of flows, of each GRPC + message. Messages larger than that number will be split and submitted sequentially. * `AGENT_IP` (optional). Allows overriding the reported Agent IP address on each flow. * `AGENT_IP_IFACE` (default: `external`). Specifies which interface should the agent pick the IP address from in order to report it in the AgentIP field on each flow. Accepted values are: diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 91d72b6d232bae79f83fe0af7a7931943977084a..3bfd80685744eeb139e7b1cdb58a150817d89a47 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -193,7 +193,7 @@ func buildFlowExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) { cfg.TargetHost, cfg.TargetPort) } target := fmt.Sprintf("%s:%d", cfg.TargetHost, cfg.TargetPort) - grpcExporter, err := exporter.StartGRPCProto(target) + grpcExporter, err := exporter.StartGRPCProto(target, cfg.GRPCMessageMaxFlows) if err != nil { return nil, err } diff --git a/pkg/agent/config.go b/pkg/agent/config.go index c2edd7edfe4f3fc7a6bd172e0d1a17e135596eb8..8f5cef529e2250e38ba63f725e24c7f6284bda94 100644 --- a/pkg/agent/config.go +++ b/pkg/agent/config.go @@ -42,6 +42,9 @@ type Config struct { TargetHost string `env:"FLOWS_TARGET_HOST"` // TargetPort is the port the target Flow collector, when the EXPORT variable is set to "grpc" TargetPort int `env:"FLOWS_TARGET_PORT"` + // GRPCMessageMaxFlows specifies the limit, in number of flows, of each GRPC message. Messages + // larger than that number will be split and submitted sequentially. + GRPCMessageMaxFlows int `env:"GRPC_MESSAGE_MAX_FLOWS" envDefault:"10000"` // Interfaces contains the interface names from where flows will be collected. If empty, the agent // will fetch all the interfaces in the system, excepting the ones listed in ExcludeInterfaces. // If an entry is enclosed by slashes (e.g. `/br-/`), it will match as regular expression, diff --git a/pkg/exporter/grpc_proto.go b/pkg/exporter/grpc_proto.go index 517aff07b0db19c5a14f35a7e9c2405131be9560..13129190a0bcf410698c13047a8b2955f4148ccf 100644 --- a/pkg/exporter/grpc_proto.go +++ b/pkg/exporter/grpc_proto.go @@ -16,16 +16,21 @@ var glog = logrus.WithField("component", "exporter/GRPCProto") type GRPCProto struct { hostPort string clientConn *grpc.ClientConnection + // maxFlowsPerMessage limits the maximum number of flows per GRPC message. + // If a message contains more flows than this number, the GRPC message will be split into + // multiple messages. + maxFlowsPerMessage int } -func StartGRPCProto(hostPort string) (*GRPCProto, error) { +func StartGRPCProto(hostPort string, maxFlowsPerMessage int) (*GRPCProto, error) { clientConn, err := grpc.ConnectClient(hostPort) if err != nil { return nil, err } return &GRPCProto{ - hostPort: hostPort, - clientConn: clientConn, + hostPort: hostPort, + clientConn: clientConn, + maxFlowsPerMessage: maxFlowsPerMessage, }, nil } @@ -34,10 +39,11 @@ func StartGRPCProto(hostPort string) (*GRPCProto, error) { func (g *GRPCProto) ExportFlows(input <-chan []*flow.Record) { log := glog.WithField("collector", g.hostPort) for inputRecords := range input { - pbRecords := flowsToPB(inputRecords) - log.Debugf("sending %d records", len(pbRecords.Entries)) - if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil { - log.WithError(err).Error("couldn't send flow records to collector") + for _, pbRecords := range flowsToPB(inputRecords, g.maxFlowsPerMessage) { + log.Debugf("sending %d records", len(pbRecords.Entries)) + if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil { + log.WithError(err).Error("couldn't send flow records to collector") + } } } if err := g.clientConn.Close(); err != nil { diff --git a/pkg/exporter/grpc_proto_test.go b/pkg/exporter/grpc_proto_test.go index 18f760ec36ce72c6bdbc0aae7e5a80e550d2e1ad..e48fcbe39f9e0ef89cb360b40a85d2cfe8b8ed50 100644 --- a/pkg/exporter/grpc_proto_test.go +++ b/pkg/exporter/grpc_proto_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + test2 "github.com/netobserv/netobserv-ebpf-agent/pkg/test" + "github.com/mariomac/guara/pkg/test" "github.com/netobserv/netobserv-ebpf-agent/pkg/flow" "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc" @@ -21,11 +23,12 @@ func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) { port, err := test.FreeTCPPort() require.NoError(t, err) serverOut := make(chan *pbflow.Records) - _, err = grpc.StartCollector(port, serverOut) + coll, err := grpc.StartCollector(port, serverOut) require.NoError(t, err) + defer coll.Close() // Start GRPCProto exporter stage - exporter, err := StartGRPCProto(fmt.Sprintf("127.0.0.1:%d", port)) + exporter, err := StartGRPCProto(fmt.Sprintf("127.0.0.1:%d", port), 1000) require.NoError(t, err) // Send some flows to the input of the exporter stage @@ -37,23 +40,14 @@ func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) { {RawRecord: flow.RawRecord{RecordKey: flow.RecordKey{EthProtocol: flow.IPv6Type}}, AgentIP: net.ParseIP("8888::1111")}, } - close(flows) go exporter.ExportFlows(flows) - var rs *pbflow.Records - select { - case rs = <-serverOut: - case <-time.After(timeout): - require.Fail(t, "timeout waiting for flows") - } + rs := test2.ReceiveTimeout(t, serverOut, timeout) assert.Len(t, rs.Entries, 1) r := rs.Entries[0] assert.EqualValues(t, 0x0a090807, r.GetAgentIp().GetIpv4()) - select { - case rs = <-serverOut: - case <-time.After(timeout): - require.Fail(t, "timeout waiting for flows") - } + + rs = test2.ReceiveTimeout(t, serverOut, timeout) assert.Len(t, rs.Entries, 1) r = rs.Entries[0] assert.EqualValues(t, net.ParseIP("8888::1111"), r.GetAgentIp().GetIpv6()) @@ -65,3 +59,45 @@ func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) { //ok! } } + +func TestGRPCProto_SplitLargeMessages(t *testing.T) { + // start remote ingestor + port, err := test.FreeTCPPort() + require.NoError(t, err) + serverOut := make(chan *pbflow.Records) + coll, err := grpc.StartCollector(port, serverOut) + require.NoError(t, err) + defer coll.Close() + + const msgMaxLen = 10000 + // Start GRPCProto exporter stage + exporter, err := StartGRPCProto(fmt.Sprintf("127.0.0.1:%d", port), msgMaxLen) + require.NoError(t, err) + + // Send a message much longer than the limit length + flows := make(chan []*flow.Record, 10) + var input []*flow.Record + for i := 0; i < 25000; i++ { + input = append(input, &flow.Record{RawRecord: flow.RawRecord{RecordKey: flow.RecordKey{ + EthProtocol: flow.IPv6Type, + }}, AgentIP: net.ParseIP("1111::1111"), Interface: "12345678"}) + } + flows <- input + go exporter.ExportFlows(flows) + + // expect that the submitted message is split in chunks no longer than msgMaxLen + rs := test2.ReceiveTimeout(t, serverOut, timeout) + assert.Len(t, rs.Entries, msgMaxLen) + rs = test2.ReceiveTimeout(t, serverOut, timeout) + assert.Len(t, rs.Entries, msgMaxLen) + rs = test2.ReceiveTimeout(t, serverOut, timeout) + assert.Len(t, rs.Entries, 5000) + + // after all the operation, no more flows are sent + select { + case rs = <-serverOut: + assert.Failf(t, "shouldn't have received any flow", "Got: %#v", rs) + default: + //ok! + } +} diff --git a/pkg/exporter/proto.go b/pkg/exporter/proto.go index 7a1075e9e4b9c0c94b36c7f4e4fc0207551a081d..a47048b9068016133c57647339f7fdce733674ec 100644 --- a/pkg/exporter/proto.go +++ b/pkg/exporter/proto.go @@ -11,14 +11,21 @@ import ( // flowsToPB is an auxiliary function to convert flow records, as returned by the eBPF agent, // into protobuf-encoded messages ready to be sent to the collector via GRPC -func flowsToPB(inputRecords []*flow.Record) *pbflow.Records { +func flowsToPB(inputRecords []*flow.Record, maxLen int) []*pbflow.Records { entries := make([]*pbflow.Record, 0, len(inputRecords)) for _, record := range inputRecords { entries = append(entries, flowToPB(record)) } - return &pbflow.Records{ - Entries: entries, + var records []*pbflow.Records + for len(entries) > 0 { + end := len(entries) + if end > maxLen { + end = maxLen + } + records = append(records, &pbflow.Records{Entries: entries[:end]}) + entries = entries[end:] } + return records } // flowsToPB is an auxiliary function to convert a single flow record, as returned by the eBPF agent, diff --git a/pkg/test/channels.go b/pkg/test/channels.go new file mode 100644 index 0000000000000000000000000000000000000000..d24e44936fb8b5e3b11ed67066429a3b98566dc1 --- /dev/null +++ b/pkg/test/channels.go @@ -0,0 +1,20 @@ +package test + +import ( + "testing" + "time" +) + +// ReceiveTimeout returns the first received element or fails the test if nothing is received +// before the given timeout +func ReceiveTimeout[T any](t *testing.T, ch <-chan T, timeout time.Duration) T { + t.Helper() + select { + case e := <-ch: + return e + case <-time.After(timeout): + var z T + t.Fatalf("timeout while waiting %s for a %T element in channel", timeout, z) + return z + } +}