diff --git a/pkg/exporter/kafka_proto.go b/pkg/exporter/kafka_proto.go index 0235cab9293a4a3148b4e47ab92aa0d9d67e7275..61338e8d70460dc0aee93d7adbf9ab54cad622d8 100644 --- a/pkg/exporter/kafka_proto.go +++ b/pkg/exporter/kafka_proto.go @@ -28,6 +28,18 @@ func (kp *KafkaProto) ExportFlows(input <-chan []*flow.Record) { } } +func getFlowKey(record *flow.Record) []byte { + // We are sorting IP address so flows from on ip to a second IP get the same key whatever the direction is + for k := range record.Id.SrcIp { + if record.Id.SrcIp[k] < record.Id.DstIp[k] { + return append(record.Id.SrcIp[:], record.Id.DstIp[:]...) + } else if record.Id.SrcIp[k] > record.Id.DstIp[k] { + return append(record.Id.DstIp[:], record.Id.SrcIp[:]...) + } + } + return append(record.Id.SrcIp[:], record.Id.DstIp[:]...) +} + func (kp *KafkaProto) batchAndSubmit(records []*flow.Record) { klog.Debugf("sending %d records", len(records)) msgs := make([]kafkago.Message, 0, len(records)) @@ -37,7 +49,7 @@ func (kp *KafkaProto) batchAndSubmit(records []*flow.Record) { klog.WithError(err).Debug("can't encode protobuf message. Ignoring") continue } - msgs = append(msgs, kafkago.Message{Value: pbBytes}) + msgs = append(msgs, kafkago.Message{Value: pbBytes, Key: getFlowKey(record)}) } if err := kp.Writer.WriteMessages(context.TODO(), msgs...); err != nil { diff --git a/pkg/exporter/kafka_proto_test.go b/pkg/exporter/kafka_proto_test.go index 884bd73ce4828c1e772edc544087c38f9863d618..d07028c80cee93155d5f8215ac81bf6457378064 100644 --- a/pkg/exporter/kafka_proto_test.go +++ b/pkg/exporter/kafka_proto_test.go @@ -21,6 +21,12 @@ func IPAddrFromNetIP(netIP net.IP) flow.IPAddr { return arr } +func ByteArrayFromNetIP(netIP net.IP) []uint8 { + var arr [net.IPv6len]uint8 + copy(arr[:], (netIP)[0:net.IPv6len]) + return arr[:] +} + func TestProtoConversion(t *testing.T) { wc := writerCapturer{} kj := KafkaProto{Writer: &wc} @@ -66,6 +72,38 @@ func TestProtoConversion(t *testing.T) { assert.EqualValues(t, 987, r.Packets) assert.EqualValues(t, uint16(1), r.Flags) assert.Equal(t, "veth0", r.Interface) + assert.Equal(t, ByteArrayFromNetIP(net.ParseIP("127.3.2.1")), wc.messages[0].Key[0:16]) + assert.Equal(t, ByteArrayFromNetIP(net.ParseIP("192.1.2.3")), wc.messages[0].Key[16:]) +} + +func TestIdenticalKeys(t *testing.T) { + record := flow.Record{} + record.Id.EthProtocol = 3 + record.Id.Direction = 1 + record.Id.SrcMac = [...]byte{0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff} + record.Id.DstMac = [...]byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66} + record.Id.SrcIp = IPAddrFromNetIP(net.ParseIP("192.1.2.3")) + record.Id.DstIp = IPAddrFromNetIP(net.ParseIP("127.3.2.1")) + record.Id.SrcPort = 4321 + record.Id.DstPort = 1234 + record.Id.IcmpType = 8 + record.Id.TransportProtocol = 210 + record.TimeFlowStart = time.Now().Add(-5 * time.Second) + record.TimeFlowEnd = time.Now() + record.Metrics.Bytes = 789 + record.Metrics.Packets = 987 + record.Metrics.Flags = uint16(1) + record.Interface = "veth0" + + key1 := getFlowKey(&record) + + record.Id.SrcIp = IPAddrFromNetIP(net.ParseIP("127.3.2.1")) + record.Id.DstIp = IPAddrFromNetIP(net.ParseIP("192.1.2.3")) + key2 := getFlowKey(&record) + + // Both keys should be identical + assert.Equal(t, key1, key2) + } type writerCapturer struct {