From 2dca243a8dc5aa449aa0ff18a6924252cfab6c79 Mon Sep 17 00:00:00 2001 From: Olivier Cazade <olivier.cazade@gmail.com> Date: Thu, 6 Apr 2023 14:01:54 +0200 Subject: [PATCH] NETOBSERV-926: Added key to kafka message for connection tracking consistency (#107) * Added key to kafka message for connection tracking consistency * Added comment about why we are sorting IP address for generating kafka key --- pkg/exporter/kafka_proto.go | 14 +++++++++++- pkg/exporter/kafka_proto_test.go | 38 ++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/pkg/exporter/kafka_proto.go b/pkg/exporter/kafka_proto.go index 0235cab9..61338e8d 100644 --- a/pkg/exporter/kafka_proto.go +++ b/pkg/exporter/kafka_proto.go @@ -28,6 +28,18 @@ func (kp *KafkaProto) ExportFlows(input <-chan []*flow.Record) { } } +func getFlowKey(record *flow.Record) []byte { + // We are sorting IP address so flows from on ip to a second IP get the same key whatever the direction is + for k := range record.Id.SrcIp { + if record.Id.SrcIp[k] < record.Id.DstIp[k] { + return append(record.Id.SrcIp[:], record.Id.DstIp[:]...) + } else if record.Id.SrcIp[k] > record.Id.DstIp[k] { + return append(record.Id.DstIp[:], record.Id.SrcIp[:]...) + } + } + return append(record.Id.SrcIp[:], record.Id.DstIp[:]...) +} + func (kp *KafkaProto) batchAndSubmit(records []*flow.Record) { klog.Debugf("sending %d records", len(records)) msgs := make([]kafkago.Message, 0, len(records)) @@ -37,7 +49,7 @@ func (kp *KafkaProto) batchAndSubmit(records []*flow.Record) { klog.WithError(err).Debug("can't encode protobuf message. Ignoring") continue } - msgs = append(msgs, kafkago.Message{Value: pbBytes}) + msgs = append(msgs, kafkago.Message{Value: pbBytes, Key: getFlowKey(record)}) } if err := kp.Writer.WriteMessages(context.TODO(), msgs...); err != nil { diff --git a/pkg/exporter/kafka_proto_test.go b/pkg/exporter/kafka_proto_test.go index 884bd73c..d07028c8 100644 --- a/pkg/exporter/kafka_proto_test.go +++ b/pkg/exporter/kafka_proto_test.go @@ -21,6 +21,12 @@ func IPAddrFromNetIP(netIP net.IP) flow.IPAddr { return arr } +func ByteArrayFromNetIP(netIP net.IP) []uint8 { + var arr [net.IPv6len]uint8 + copy(arr[:], (netIP)[0:net.IPv6len]) + return arr[:] +} + func TestProtoConversion(t *testing.T) { wc := writerCapturer{} kj := KafkaProto{Writer: &wc} @@ -66,6 +72,38 @@ func TestProtoConversion(t *testing.T) { assert.EqualValues(t, 987, r.Packets) assert.EqualValues(t, uint16(1), r.Flags) assert.Equal(t, "veth0", r.Interface) + assert.Equal(t, ByteArrayFromNetIP(net.ParseIP("127.3.2.1")), wc.messages[0].Key[0:16]) + assert.Equal(t, ByteArrayFromNetIP(net.ParseIP("192.1.2.3")), wc.messages[0].Key[16:]) +} + +func TestIdenticalKeys(t *testing.T) { + record := flow.Record{} + record.Id.EthProtocol = 3 + record.Id.Direction = 1 + record.Id.SrcMac = [...]byte{0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff} + record.Id.DstMac = [...]byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66} + record.Id.SrcIp = IPAddrFromNetIP(net.ParseIP("192.1.2.3")) + record.Id.DstIp = IPAddrFromNetIP(net.ParseIP("127.3.2.1")) + record.Id.SrcPort = 4321 + record.Id.DstPort = 1234 + record.Id.IcmpType = 8 + record.Id.TransportProtocol = 210 + record.TimeFlowStart = time.Now().Add(-5 * time.Second) + record.TimeFlowEnd = time.Now() + record.Metrics.Bytes = 789 + record.Metrics.Packets = 987 + record.Metrics.Flags = uint16(1) + record.Interface = "veth0" + + key1 := getFlowKey(&record) + + record.Id.SrcIp = IPAddrFromNetIP(net.ParseIP("127.3.2.1")) + record.Id.DstIp = IPAddrFromNetIP(net.ParseIP("192.1.2.3")) + key2 := getFlowKey(&record) + + // Both keys should be identical + assert.Equal(t, key1, key2) + } type writerCapturer struct { -- GitLab