Skip to content
Snippets Groups Projects
Unverified Commit 2dca243a authored by Olivier Cazade's avatar Olivier Cazade Committed by GitHub
Browse files

NETOBSERV-926: Added key to kafka message for connection tracking consistency (#107)

* Added key to kafka message for connection tracking consistency

* Added comment about why we are sorting IP address for generating kafka key
parent c62173ac
No related branches found
No related tags found
No related merge requests found
......@@ -28,6 +28,18 @@ func (kp *KafkaProto) ExportFlows(input <-chan []*flow.Record) {
}
}
func getFlowKey(record *flow.Record) []byte {
// We are sorting IP address so flows from on ip to a second IP get the same key whatever the direction is
for k := range record.Id.SrcIp {
if record.Id.SrcIp[k] < record.Id.DstIp[k] {
return append(record.Id.SrcIp[:], record.Id.DstIp[:]...)
} else if record.Id.SrcIp[k] > record.Id.DstIp[k] {
return append(record.Id.DstIp[:], record.Id.SrcIp[:]...)
}
}
return append(record.Id.SrcIp[:], record.Id.DstIp[:]...)
}
func (kp *KafkaProto) batchAndSubmit(records []*flow.Record) {
klog.Debugf("sending %d records", len(records))
msgs := make([]kafkago.Message, 0, len(records))
......@@ -37,7 +49,7 @@ func (kp *KafkaProto) batchAndSubmit(records []*flow.Record) {
klog.WithError(err).Debug("can't encode protobuf message. Ignoring")
continue
}
msgs = append(msgs, kafkago.Message{Value: pbBytes})
msgs = append(msgs, kafkago.Message{Value: pbBytes, Key: getFlowKey(record)})
}
if err := kp.Writer.WriteMessages(context.TODO(), msgs...); err != nil {
......
......@@ -21,6 +21,12 @@ func IPAddrFromNetIP(netIP net.IP) flow.IPAddr {
return arr
}
func ByteArrayFromNetIP(netIP net.IP) []uint8 {
var arr [net.IPv6len]uint8
copy(arr[:], (netIP)[0:net.IPv6len])
return arr[:]
}
func TestProtoConversion(t *testing.T) {
wc := writerCapturer{}
kj := KafkaProto{Writer: &wc}
......@@ -66,6 +72,38 @@ func TestProtoConversion(t *testing.T) {
assert.EqualValues(t, 987, r.Packets)
assert.EqualValues(t, uint16(1), r.Flags)
assert.Equal(t, "veth0", r.Interface)
assert.Equal(t, ByteArrayFromNetIP(net.ParseIP("127.3.2.1")), wc.messages[0].Key[0:16])
assert.Equal(t, ByteArrayFromNetIP(net.ParseIP("192.1.2.3")), wc.messages[0].Key[16:])
}
func TestIdenticalKeys(t *testing.T) {
record := flow.Record{}
record.Id.EthProtocol = 3
record.Id.Direction = 1
record.Id.SrcMac = [...]byte{0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff}
record.Id.DstMac = [...]byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66}
record.Id.SrcIp = IPAddrFromNetIP(net.ParseIP("192.1.2.3"))
record.Id.DstIp = IPAddrFromNetIP(net.ParseIP("127.3.2.1"))
record.Id.SrcPort = 4321
record.Id.DstPort = 1234
record.Id.IcmpType = 8
record.Id.TransportProtocol = 210
record.TimeFlowStart = time.Now().Add(-5 * time.Second)
record.TimeFlowEnd = time.Now()
record.Metrics.Bytes = 789
record.Metrics.Packets = 987
record.Metrics.Flags = uint16(1)
record.Interface = "veth0"
key1 := getFlowKey(&record)
record.Id.SrcIp = IPAddrFromNetIP(net.ParseIP("127.3.2.1"))
record.Id.DstIp = IPAddrFromNetIP(net.ParseIP("192.1.2.3"))
key2 := getFlowKey(&record)
// Both keys should be identical
assert.Equal(t, key1, key2)
}
type writerCapturer struct {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment