Skip to content
Snippets Groups Projects
Unverified Commit 45b4757d authored by shach33's avatar shach33 Committed by GitHub
Browse files

Added TCP Flags to Flow Record metrics (#68)


* Test commit

* Adding TCP flags

* Adding TCP flags to record metrics.TODO: Remove log stmts

* Updated tcp flags u32-> u16

* Updated indentation in C file

* Update flow.h

* Indent fixed. Tab -> 4 space

fixing indents

Indent fixes for flows.c

Indent fixes for flows.c-II

Still fixing indents

* Removed extra file

* Added clang-format config to bpf/. To use clang-format -i <file.c>

* Fixed message record member sequences

* Added TCP Flags combinations Read from flw

Updated changes according to comments on PR

* Removed log statements.

Rebase merges

* Fixing merge conflits

* Revert the renaming of grpc_test file

* Simplify setting flags for v4,v6

* Add flags to ipfix exporter

* Update protobuf

* Remove type conversions due to conflicts

* Remove .gitignore change

* Fix indentation

* Remove commented line

* Fixed typecast errors since move to go 1.18

* Reverting commit d1e9c44. Added changes as separate PR

* set flags for v6

Co-authored-by: default avatarPravein Govindan Kannan <pravein.govindan.kannan@ibm.com>
parent 03304537
Branches
Tags
No related merge requests found
Showing with 238 additions and 166 deletions
{
BasedOnStyle: LLVM,
AllowShortFunctionsOnASingleLine: InlineOnly,
ColumnLimit: 100,
IndentWidth: 4,
SortIncludes: false,
ReflowComments: false,
TabWidth: 4,
}
...@@ -17,6 +17,8 @@ typedef struct flow_metrics_t { ...@@ -17,6 +17,8 @@ typedef struct flow_metrics_t {
// as output from bpf_ktime_get_ns() // as output from bpf_ktime_get_ns()
u64 start_mono_time_ts; u64 start_mono_time_ts;
u64 end_mono_time_ts; u64 end_mono_time_ts;
// TCP Flags from https://www.ietf.org/rfc/rfc793.txt
u16 flags;
// The positive errno of a failed map insertion that caused a flow // The positive errno of a failed map insertion that caused a flow
// to be sent via ringbuffer. // to be sent via ringbuffer.
// 0 otherwise // 0 otherwise
......
...@@ -26,7 +26,6 @@ ...@@ -26,7 +26,6 @@
#include <linux/udp.h> #include <linux/udp.h>
#include <linux/tcp.h> #include <linux/tcp.h>
#include <string.h> #include <string.h>
#include <stdbool.h> #include <stdbool.h>
#include <linux/if_ether.h> #include <linux/if_ether.h>
...@@ -42,6 +41,20 @@ ...@@ -42,6 +41,20 @@
#define INGRESS 0 #define INGRESS 0
#define EGRESS 1 #define EGRESS 1
// Flags according to RFC 9293 & https://www.iana.org/assignments/ipfix/ipfix.xhtml
#define FIN_FLAG 0x01
#define SYN_FLAG 0x02
#define RST_FLAG 0x04
#define PSH_FLAG 0x08
#define ACK_FLAG 0x10
#define URG_FLAG 0x20
#define ECE_FLAG 0x40
#define CWR_FLAG 0x80
// Custom flags exported
#define SYN_ACK_FLAG 0x100
#define FIN_ACK_FLAG 0x200
#define RST_ACK_FLAG 0x400
// Common Ringbuffer as a conduit for ingress/egress flows to userspace // Common Ringbuffer as a conduit for ingress/egress flows to userspace
struct { struct {
__uint(type, BPF_MAP_TYPE_RINGBUF); __uint(type, BPF_MAP_TYPE_RINGBUF);
...@@ -62,8 +75,35 @@ volatile const u8 trace_messages = 0; ...@@ -62,8 +75,35 @@ volatile const u8 trace_messages = 0;
const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}; const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff};
// sets the TCP header flags for connection information
static inline void set_flags(struct tcphdr *th, u16 *flags) {
//If both ACK and SYN are set, then it is server -> client communication during 3-way handshake.
if (th->ack && th->syn) {
*flags |= SYN_ACK_FLAG;
} else if (th->ack && th->fin ) {
// If both ACK and FIN are set, then it is graceful termination from server.
*flags |= FIN_ACK_FLAG;
} else if (th->ack && th->rst ) {
// If both ACK and RST are set, then it is abrupt connection termination.
*flags |= RST_ACK_FLAG;
} else if (th->fin) {
*flags |= FIN_FLAG;
} else if (th->syn) {
*flags |= SYN_FLAG;
} else if (th->rst) {
*flags |= RST_FLAG;
} else if (th->psh) {
*flags |= PSH_FLAG;
} else if (th->urg) {
*flags |= URG_FLAG;
} else if (th->ece) {
*flags |= ECE_FLAG;
} else if (th->cwr) {
*flags |= CWR_FLAG;
}
}
// sets flow fields from IPv4 header information // sets flow fields from IPv4 header information
static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id) { static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id, u16 *flags) {
if ((void *)ip + sizeof(*ip) > data_end) { if ((void *)ip + sizeof(*ip) > data_end) {
return DISCARD; return DISCARD;
} }
...@@ -81,6 +121,7 @@ static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id) { ...@@ -81,6 +121,7 @@ static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id) {
if ((void *)tcp + sizeof(*tcp) <= data_end) { if ((void *)tcp + sizeof(*tcp) <= data_end) {
id->src_port = __bpf_ntohs(tcp->source); id->src_port = __bpf_ntohs(tcp->source);
id->dst_port = __bpf_ntohs(tcp->dest); id->dst_port = __bpf_ntohs(tcp->dest);
set_flags(tcp, flags);
} }
} break; } break;
case IPPROTO_UDP: { case IPPROTO_UDP: {
...@@ -97,7 +138,7 @@ static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id) { ...@@ -97,7 +138,7 @@ static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id) {
} }
// sets flow fields from IPv6 header information // sets flow fields from IPv6 header information
static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, flow_id *id) { static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, flow_id *id, u16 *flags) {
if ((void *)ip + sizeof(*ip) > data_end) { if ((void *)ip + sizeof(*ip) > data_end) {
return DISCARD; return DISCARD;
} }
...@@ -113,6 +154,7 @@ static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, flow_id *id) { ...@@ -113,6 +154,7 @@ static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, flow_id *id) {
if ((void *)tcp + sizeof(*tcp) <= data_end) { if ((void *)tcp + sizeof(*tcp) <= data_end) {
id->src_port = __bpf_ntohs(tcp->source); id->src_port = __bpf_ntohs(tcp->source);
id->dst_port = __bpf_ntohs(tcp->dest); id->dst_port = __bpf_ntohs(tcp->dest);
set_flags(tcp, flags);
} }
} break; } break;
case IPPROTO_UDP: { case IPPROTO_UDP: {
...@@ -128,7 +170,7 @@ static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, flow_id *id) { ...@@ -128,7 +170,7 @@ static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, flow_id *id) {
return SUBMIT; return SUBMIT;
} }
// sets flow fields from Ethernet header information // sets flow fields from Ethernet header information
static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, flow_id *id) { static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, flow_id *id, u16 *flags) {
if ((void *)eth + sizeof(*eth) > data_end) { if ((void *)eth + sizeof(*eth) > data_end) {
return DISCARD; return DISCARD;
} }
...@@ -138,15 +180,15 @@ static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, flow_id *id) { ...@@ -138,15 +180,15 @@ static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, flow_id *id) {
if (id->eth_protocol == ETH_P_IP) { if (id->eth_protocol == ETH_P_IP) {
struct iphdr *ip = (void *)eth + sizeof(*eth); struct iphdr *ip = (void *)eth + sizeof(*eth);
return fill_iphdr(ip, data_end, id); return fill_iphdr(ip, data_end, id, flags);
} else if (id->eth_protocol == ETH_P_IPV6) { } else if (id->eth_protocol == ETH_P_IPV6) {
struct ipv6hdr *ip6 = (void *)eth + sizeof(*eth); struct ipv6hdr *ip6 = (void *)eth + sizeof(*eth);
return fill_ip6hdr(ip6, data_end, id); return fill_ip6hdr(ip6, data_end, id, flags);
} else { } else {
// TODO : Need to implement other specific ethertypes if needed // TODO : Need to implement other specific ethertypes if needed
// For now other parts of flow id remain zero // For now other parts of flow id remain zero
memset (&(id->src_ip),0, sizeof(struct in6_addr)); memset(&(id->src_ip), 0, sizeof(struct in6_addr));
memset (&(id->dst_ip),0, sizeof(struct in6_addr)); memset(&(id->dst_ip), 0, sizeof(struct in6_addr));
id->transport_protocol = 0; id->transport_protocol = 0;
id->src_port = 0; id->src_port = 0;
id->dst_port = 0; id->dst_port = 0;
...@@ -154,7 +196,6 @@ static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, flow_id *id) { ...@@ -154,7 +196,6 @@ static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, flow_id *id) {
return SUBMIT; return SUBMIT;
} }
static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
// If sampling is defined, will only parse 1 out of "sampling" flows // If sampling is defined, will only parse 1 out of "sampling" flows
if (sampling != 0 && (bpf_get_prandom_u32() % sampling) != 0) { if (sampling != 0 && (bpf_get_prandom_u32() % sampling) != 0) {
...@@ -166,7 +207,8 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { ...@@ -166,7 +207,8 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
flow_id id; flow_id id;
u64 current_time = bpf_ktime_get_ns(); u64 current_time = bpf_ktime_get_ns();
struct ethhdr *eth = data; struct ethhdr *eth = data;
if (fill_ethhdr(eth, data_end, &id) == DISCARD) { u16 flags = 0;
if (fill_ethhdr(eth, data_end, &id, &flags) == DISCARD) {
return TC_ACT_OK; return TC_ACT_OK;
} }
id.if_index = skb->ifindex; id.if_index = skb->ifindex;
...@@ -184,7 +226,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { ...@@ -184,7 +226,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
if (aggregate_flow->start_mono_time_ts == 0) { if (aggregate_flow->start_mono_time_ts == 0) {
aggregate_flow->start_mono_time_ts = current_time; aggregate_flow->start_mono_time_ts = current_time;
} }
aggregate_flow->flags |= flags;
long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY); long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY);
if (trace_messages && ret != 0) { if (trace_messages && ret != 0) {
// usually error -16 (-EBUSY) is printed here. // usually error -16 (-EBUSY) is printed here.
...@@ -198,9 +240,10 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { ...@@ -198,9 +240,10 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
// Key does not exist in the map, and will need to create a new entry. // Key does not exist in the map, and will need to create a new entry.
flow_metrics new_flow = { flow_metrics new_flow = {
.packets = 1, .packets = 1,
.bytes=skb->len, .bytes = skb->len,
.start_mono_time_ts = current_time, .start_mono_time_ts = current_time,
.end_mono_time_ts = current_time, .end_mono_time_ts = current_time,
.flags = flags,
}; };
// even if we know that the entry is new, another CPU might be concurrently inserting a flow // even if we know that the entry is new, another CPU might be concurrently inserting a flow
...@@ -230,15 +273,14 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { ...@@ -230,15 +273,14 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
} }
} }
return TC_ACT_OK; return TC_ACT_OK;
} }
SEC("tc_ingress") SEC("tc_ingress")
int ingress_flow_parse (struct __sk_buff *skb) { int ingress_flow_parse(struct __sk_buff *skb) {
return flow_monitor(skb, INGRESS); return flow_monitor(skb, INGRESS);
} }
SEC("tc_egress") SEC("tc_egress")
int egress_flow_parse (struct __sk_buff *skb) { int egress_flow_parse(struct __sk_buff *skb) {
return flow_monitor(skb, EGRESS); return flow_monitor(skb, EGRESS);
} }
char _license[] SEC("license") = "GPL"; char _license[] SEC("license") = "GPL";
...@@ -61,7 +61,7 @@ func main() { ...@@ -61,7 +61,7 @@ func main() {
log.SetFlags(0) log.SetFlags(0)
flag.Parse() flag.Parse()
receivedRecords := make(chan *pbflow.Records, 100) receivedRecords := make(chan *pbflow.Records, 1000)
log.Println("starting flowlogs-dump-collector on port", *port) log.Println("starting flowlogs-dump-collector on port", *port)
go func() { go func() {
_, err := grpc.StartCollector(*port, receivedRecords) _, err := grpc.StartCollector(*port, receivedRecords)
...@@ -72,7 +72,7 @@ func main() { ...@@ -72,7 +72,7 @@ func main() {
for records := range receivedRecords { for records := range receivedRecords {
for _, record := range records.Entries { for _, record := range records.Entries {
if record.EthProtocol == ipv6 { if record.EthProtocol == ipv6 {
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s dir:%d bytes:%d packets:%d ends: %v\n", log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s dir:%d bytes:%d packets:%d flags:%d ends: %v\n",
ipProto[record.EthProtocol], ipProto[record.EthProtocol],
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"), record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
record.Interface, record.Interface,
...@@ -84,10 +84,11 @@ func main() { ...@@ -84,10 +84,11 @@ func main() {
record.Direction, record.Direction,
record.Bytes, record.Bytes,
record.Packets, record.Packets,
record.Flags,
record.TimeFlowEnd.AsTime().Local().Format("15:04:05.000000"), record.TimeFlowEnd.AsTime().Local().Format("15:04:05.000000"),
) )
} else { } else {
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s dir:%d bytes:%d packets:%d ends: %v\n", log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s dir:%d bytes:%d packets:%d flags:%d ends: %v\n",
ipProto[record.EthProtocol], ipProto[record.EthProtocol],
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"), record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
record.Interface, record.Interface,
...@@ -99,6 +100,7 @@ func main() { ...@@ -99,6 +100,7 @@ func main() {
record.Direction, record.Direction,
record.Bytes, record.Bytes,
record.Packets, record.Packets,
record.Flags,
record.TimeFlowEnd.AsTime().Local().Format("15:04:05.000000"), record.TimeFlowEnd.AsTime().Local().Format("15:04:05.000000"),
) )
} }
......
...@@ -6,6 +6,7 @@ require ( ...@@ -6,6 +6,7 @@ require (
github.com/caarlos0/env/v6 v6.9.1 github.com/caarlos0/env/v6 v6.9.1
github.com/cilium/ebpf v0.8.1 github.com/cilium/ebpf v0.8.1
github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424 github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424
github.com/golang/protobuf v1.5.2
github.com/mariomac/guara v0.0.0-20220523124851-5fc279816f1f github.com/mariomac/guara v0.0.0-20220523124851-5fc279816f1f
github.com/netobserv/gopipes v0.3.0 github.com/netobserv/gopipes v0.3.0
github.com/paulbellamy/ratecounter v0.2.0 github.com/paulbellamy/ratecounter v0.2.0
...@@ -35,7 +36,6 @@ require ( ...@@ -35,7 +36,6 @@ require (
github.com/go-openapi/jsonreference v0.19.5 // indirect github.com/go-openapi/jsonreference v0.19.5 // indirect
github.com/go-openapi/swag v0.19.14 // indirect github.com/go-openapi/swag v0.19.14 // indirect
github.com/gogo/protobuf v1.3.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.7 // indirect github.com/google/go-cmp v0.5.7 // indirect
github.com/google/gofuzz v1.1.0 // indirect github.com/google/gofuzz v1.1.0 // indirect
......
No preview for this file type
No preview for this file type
...@@ -39,6 +39,42 @@ func addElementToTemplate(log *logrus.Entry, elementName string, value []byte, e ...@@ -39,6 +39,42 @@ func addElementToTemplate(log *logrus.Entry, elementName string, value []byte, e
return nil return nil
} }
func AddRecordValuesToTemplate(log *logrus.Entry, elements *[]entities.InfoElementWithValue) error {
err := addElementToTemplate(log, "octetDeltaCount", nil, elements)
if err != nil {
return err
}
err = addElementToTemplate(log, "tcpControlBits", nil, elements)
if err != nil {
return err
}
err = addElementToTemplate(log, "flowStartSeconds", nil, elements)
if err != nil {
return err
}
err = addElementToTemplate(log, "flowStartMilliseconds", nil, elements)
if err != nil {
return err
}
err = addElementToTemplate(log, "flowEndSeconds", nil, elements)
if err != nil {
return err
}
err = addElementToTemplate(log, "flowEndMilliseconds", nil, elements)
if err != nil {
return err
}
err = addElementToTemplate(log, "packetDeltaCount", nil, elements)
if err != nil {
return err
}
err = addElementToTemplate(log, "interfaceName", nil, elements)
if err != nil {
return err
}
return nil
}
func SendTemplateRecordv4(log *logrus.Entry, exporter *ipfixExporter.ExportingProcess) (uint16, []entities.InfoElementWithValue, error) { func SendTemplateRecordv4(log *logrus.Entry, exporter *ipfixExporter.ExportingProcess) (uint16, []entities.InfoElementWithValue, error) {
templateID := exporter.NewTemplateID() templateID := exporter.NewTemplateID()
templateSet := entities.NewSet(false) templateSet := entities.NewSet(false)
...@@ -84,35 +120,10 @@ func SendTemplateRecordv4(log *logrus.Entry, exporter *ipfixExporter.ExportingPr ...@@ -84,35 +120,10 @@ func SendTemplateRecordv4(log *logrus.Entry, exporter *ipfixExporter.ExportingPr
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
err = addElementToTemplate(log, "octetDeltaCount", nil, &elements) err = AddRecordValuesToTemplate(log, &elements)
if err != nil {
return 0, nil, err
}
err = addElementToTemplate(log, "flowStartSeconds", nil, &elements)
if err != nil {
return 0, nil, err
}
err = addElementToTemplate(log, "flowStartMilliseconds", nil, &elements)
if err != nil {
return 0, nil, err
}
err = addElementToTemplate(log, "flowEndSeconds", nil, &elements)
if err != nil {
return 0, nil, err
}
err = addElementToTemplate(log, "flowEndMilliseconds", nil, &elements)
if err != nil {
return 0, nil, err
}
err = addElementToTemplate(log, "packetDeltaCount", nil, &elements)
if err != nil {
return 0, nil, err
}
err = addElementToTemplate(log, "interfaceName", nil, &elements)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
err = templateSet.AddRecord(elements, templateID) err = templateSet.AddRecord(elements, templateID)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
...@@ -170,31 +181,7 @@ func SendTemplateRecordv6(log *logrus.Entry, exporter *ipfixExporter.ExportingPr ...@@ -170,31 +181,7 @@ func SendTemplateRecordv6(log *logrus.Entry, exporter *ipfixExporter.ExportingPr
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
err = addElementToTemplate(log, "octetDeltaCount", nil, &elements) err = AddRecordValuesToTemplate(log, &elements)
if err != nil {
return 0, nil, err
}
err = addElementToTemplate(log, "flowStartSeconds", nil, &elements)
if err != nil {
return 0, nil, err
}
err = addElementToTemplate(log, "flowStartMilliseconds", nil, &elements)
if err != nil {
return 0, nil, err
}
err = addElementToTemplate(log, "flowEndSeconds", nil, &elements)
if err != nil {
return 0, nil, err
}
err = addElementToTemplate(log, "flowEndMilliseconds", nil, &elements)
if err != nil {
return 0, nil, err
}
err = addElementToTemplate(log, "packetDeltaCount", nil, &elements)
if err != nil {
return 0, nil, err
}
err = addElementToTemplate(log, "interfaceName", nil, &elements)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
...@@ -262,6 +249,27 @@ func setIPv4Address(ieValPtr *entities.InfoElementWithValue, ipAddress net.IP) { ...@@ -262,6 +249,27 @@ func setIPv4Address(ieValPtr *entities.InfoElementWithValue, ipAddress net.IP) {
ieVal.SetIPAddressValue(ipAddress) ieVal.SetIPAddressValue(ipAddress)
} }
} }
func setIERecordValue(record *flow.Record, ieValPtr *entities.InfoElementWithValue) {
ieVal := *ieValPtr
switch ieVal.GetName() {
case "octetDeltaCount":
ieVal.SetUnsigned64Value(record.Bytes)
case "tcpControlBits":
ieVal.SetUnsigned16Value(record.Flags)
case "flowStartSeconds":
ieVal.SetUnsigned32Value(uint32(record.TimeFlowStart.Unix()))
case "flowStartMilliseconds":
ieVal.SetUnsigned64Value(uint64(record.TimeFlowStart.UnixMilli()))
case "flowEndSeconds":
ieVal.SetUnsigned32Value(uint32(record.TimeFlowEnd.Unix()))
case "flowEndMilliseconds":
ieVal.SetUnsigned64Value(uint64(record.TimeFlowEnd.UnixMilli()))
case "packetDeltaCount":
ieVal.SetUnsigned64Value(uint64(record.Packets))
case "interfaceName":
ieVal.SetStringValue(record.Interface)
}
}
func setIEValue(record *flow.Record, ieValPtr *entities.InfoElementWithValue) { func setIEValue(record *flow.Record, ieValPtr *entities.InfoElementWithValue) {
ieVal := *ieValPtr ieVal := *ieValPtr
switch ieVal.GetName() { switch ieVal.GetName() {
...@@ -289,26 +297,12 @@ func setIEValue(record *flow.Record, ieValPtr *entities.InfoElementWithValue) { ...@@ -289,26 +297,12 @@ func setIEValue(record *flow.Record, ieValPtr *entities.InfoElementWithValue) {
ieVal.SetUnsigned16Value(record.Transport.SrcPort) ieVal.SetUnsigned16Value(record.Transport.SrcPort)
case "destinationTransportPort": case "destinationTransportPort":
ieVal.SetUnsigned16Value(record.Transport.DstPort) ieVal.SetUnsigned16Value(record.Transport.DstPort)
case "octetDeltaCount":
ieVal.SetUnsigned64Value(record.Bytes)
case "flowStartSeconds":
ieVal.SetUnsigned32Value(uint32(record.TimeFlowStart.Unix()))
case "flowStartMilliseconds":
ieVal.SetUnsigned64Value(uint64(record.TimeFlowStart.UnixMilli()))
case "flowEndSeconds":
ieVal.SetUnsigned32Value(uint32(record.TimeFlowEnd.Unix()))
case "flowEndMilliseconds":
ieVal.SetUnsigned64Value(uint64(record.TimeFlowEnd.UnixMilli()))
case "packetDeltaCount":
ieVal.SetUnsigned64Value(uint64(record.Packets))
case "interfaceName":
ieVal.SetStringValue(record.Interface)
} }
} }
func setEntities(record *flow.Record, elements *[]entities.InfoElementWithValue) { func setEntities(record *flow.Record, elements *[]entities.InfoElementWithValue) {
for _, ieVal := range *elements { for _, ieVal := range *elements {
setIEValue(record, &ieVal) setIEValue(record, &ieVal)
setIERecordValue(record, &ieVal)
} }
} }
func (ipf *IPFIX) sendDataRecord(log *logrus.Entry, record *flow.Record, v6 bool) error { func (ipf *IPFIX) sendDataRecord(log *logrus.Entry, record *flow.Record, v6 bool) error {
......
...@@ -24,7 +24,7 @@ func IPAddrFromNetIP(netIP net.IP) flow.IPAddr { ...@@ -24,7 +24,7 @@ func IPAddrFromNetIP(netIP net.IP) flow.IPAddr {
func TestProtoConversion(t *testing.T) { func TestProtoConversion(t *testing.T) {
wc := writerCapturer{} wc := writerCapturer{}
kj := KafkaProto{Writer: &wc} kj := KafkaProto{Writer: &wc}
input := make(chan []*flow.Record, 10) input := make(chan []*flow.Record, 11)
record := flow.Record{} record := flow.Record{}
record.EthProtocol = 3 record.EthProtocol = 3
record.Direction = 1 record.Direction = 1
...@@ -39,6 +39,7 @@ func TestProtoConversion(t *testing.T) { ...@@ -39,6 +39,7 @@ func TestProtoConversion(t *testing.T) {
record.TimeFlowEnd = time.Now() record.TimeFlowEnd = time.Now()
record.Bytes = 789 record.Bytes = 789
record.Packets = 987 record.Packets = 987
record.Flags = uint16(1)
record.Interface = "veth0" record.Interface = "veth0"
input <- []*flow.Record{&record} input <- []*flow.Record{&record}
...@@ -61,6 +62,7 @@ func TestProtoConversion(t *testing.T) { ...@@ -61,6 +62,7 @@ func TestProtoConversion(t *testing.T) {
assert.Equal(t, record.TimeFlowEnd.UnixMilli(), r.TimeFlowEnd.AsTime().UnixMilli()) assert.Equal(t, record.TimeFlowEnd.UnixMilli(), r.TimeFlowEnd.AsTime().UnixMilli())
assert.EqualValues(t, 789, r.Bytes) assert.EqualValues(t, 789, r.Bytes)
assert.EqualValues(t, 987, r.Packets) assert.EqualValues(t, 987, r.Packets)
assert.EqualValues(t, uint16(1), r.Flags)
assert.Equal(t, "veth0", r.Interface) assert.Equal(t, "veth0", r.Interface)
} }
......
...@@ -64,9 +64,10 @@ func v4FlowToPB(fr *flow.Record) *pbflow.Record { ...@@ -64,9 +64,10 @@ func v4FlowToPB(fr *flow.Record) *pbflow.Record {
Nanos: int32(fr.TimeFlowEnd.Nanosecond()), Nanos: int32(fr.TimeFlowEnd.Nanosecond()),
}, },
Packets: uint64(fr.Packets), Packets: uint64(fr.Packets),
Interface: fr.Interface,
Duplicate: fr.Duplicate, Duplicate: fr.Duplicate,
AgentIp: agentIP(fr.AgentIP), AgentIp: agentIP(fr.AgentIP),
Flags: uint32(fr.Flags),
Interface: string(fr.Interface),
} }
} }
...@@ -97,6 +98,7 @@ func v6FlowToPB(fr *flow.Record) *pbflow.Record { ...@@ -97,6 +98,7 @@ func v6FlowToPB(fr *flow.Record) *pbflow.Record {
Nanos: int32(fr.TimeFlowEnd.Nanosecond()), Nanos: int32(fr.TimeFlowEnd.Nanosecond()),
}, },
Packets: uint64(fr.Packets), Packets: uint64(fr.Packets),
Flags: uint32(fr.Flags),
Interface: fr.Interface, Interface: fr.Interface,
Duplicate: fr.Duplicate, Duplicate: fr.Duplicate,
AgentIp: agentIP(fr.AgentIP), AgentIp: agentIP(fr.AgentIP),
......
...@@ -55,19 +55,19 @@ func TestEvict_MaxEntries(t *testing.T) { ...@@ -55,19 +55,19 @@ func TestEvict_MaxEntries(t *testing.T) {
inputs <- &RawRecord{ inputs <- &RawRecord{
RecordKey: k1, RecordKey: k1,
RecordMetrics: RecordMetrics{ RecordMetrics: RecordMetrics{
Bytes: 123, Packets: 1, StartMonoTimeNs: 123, EndMonoTimeNs: 123, Bytes: 123, Packets: 1, StartMonoTimeNs: 123, EndMonoTimeNs: 123, Flags: 1,
}, },
} }
inputs <- &RawRecord{ inputs <- &RawRecord{
RecordKey: k2, RecordKey: k2,
RecordMetrics: RecordMetrics{ RecordMetrics: RecordMetrics{
Bytes: 456, Packets: 1, StartMonoTimeNs: 456, EndMonoTimeNs: 456, Bytes: 456, Packets: 1, StartMonoTimeNs: 456, EndMonoTimeNs: 456, Flags: 1,
}, },
} }
inputs <- &RawRecord{ inputs <- &RawRecord{
RecordKey: k1, RecordKey: k1,
RecordMetrics: RecordMetrics{ RecordMetrics: RecordMetrics{
Bytes: 321, Packets: 1, StartMonoTimeNs: 789, EndMonoTimeNs: 789, Bytes: 321, Packets: 1, StartMonoTimeNs: 789, EndMonoTimeNs: 789, Flags: 1,
}, },
} }
requireNoEviction(t, evictor) requireNoEviction(t, evictor)
...@@ -76,7 +76,7 @@ func TestEvict_MaxEntries(t *testing.T) { ...@@ -76,7 +76,7 @@ func TestEvict_MaxEntries(t *testing.T) {
inputs <- &RawRecord{ inputs <- &RawRecord{
RecordKey: k3, RecordKey: k3,
RecordMetrics: RecordMetrics{ RecordMetrics: RecordMetrics{
Bytes: 111, Packets: 1, StartMonoTimeNs: 888, EndMonoTimeNs: 888, Bytes: 111, Packets: 1, StartMonoTimeNs: 888, EndMonoTimeNs: 888, Flags: 1,
}, },
} }
...@@ -96,7 +96,7 @@ func TestEvict_MaxEntries(t *testing.T) { ...@@ -96,7 +96,7 @@ func TestEvict_MaxEntries(t *testing.T) {
RawRecord: RawRecord{ RawRecord: RawRecord{
RecordKey: k1, RecordKey: k1,
RecordMetrics: RecordMetrics{ RecordMetrics: RecordMetrics{
Bytes: 444, Packets: 2, StartMonoTimeNs: 123, EndMonoTimeNs: 789, Bytes: 444, Packets: 2, StartMonoTimeNs: 123, EndMonoTimeNs: 789, Flags: 1,
}, },
}, },
TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond), TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond),
...@@ -106,7 +106,7 @@ func TestEvict_MaxEntries(t *testing.T) { ...@@ -106,7 +106,7 @@ func TestEvict_MaxEntries(t *testing.T) {
RawRecord: RawRecord{ RawRecord: RawRecord{
RecordKey: k2, RecordKey: k2,
RecordMetrics: RecordMetrics{ RecordMetrics: RecordMetrics{
Bytes: 456, Packets: 1, StartMonoTimeNs: 456, EndMonoTimeNs: 456, Bytes: 456, Packets: 1, StartMonoTimeNs: 456, EndMonoTimeNs: 456, Flags: 1,
}, },
}, },
TimeFlowStart: now.Add(-(1000 - 456) * time.Nanosecond), TimeFlowStart: now.Add(-(1000 - 456) * time.Nanosecond),
...@@ -132,19 +132,19 @@ func TestEvict_Period(t *testing.T) { ...@@ -132,19 +132,19 @@ func TestEvict_Period(t *testing.T) {
inputs <- &RawRecord{ inputs <- &RawRecord{
RecordKey: k1, RecordKey: k1,
RecordMetrics: RecordMetrics{ RecordMetrics: RecordMetrics{
Bytes: 10, Packets: 1, StartMonoTimeNs: 123, EndMonoTimeNs: 123, Bytes: 10, Packets: 1, StartMonoTimeNs: 123, EndMonoTimeNs: 123, Flags: 1,
}, },
} }
inputs <- &RawRecord{ inputs <- &RawRecord{
RecordKey: k1, RecordKey: k1,
RecordMetrics: RecordMetrics{ RecordMetrics: RecordMetrics{
Bytes: 10, Packets: 1, StartMonoTimeNs: 456, EndMonoTimeNs: 456, Bytes: 10, Packets: 1, StartMonoTimeNs: 456, EndMonoTimeNs: 456, Flags: 1,
}, },
} }
inputs <- &RawRecord{ inputs <- &RawRecord{
RecordKey: k1, RecordKey: k1,
RecordMetrics: RecordMetrics{ RecordMetrics: RecordMetrics{
Bytes: 10, Packets: 1, StartMonoTimeNs: 789, EndMonoTimeNs: 789, Bytes: 10, Packets: 1, StartMonoTimeNs: 789, EndMonoTimeNs: 789, Flags: 1,
}, },
} }
// Forcing at least one eviction here // Forcing at least one eviction here
...@@ -152,13 +152,13 @@ func TestEvict_Period(t *testing.T) { ...@@ -152,13 +152,13 @@ func TestEvict_Period(t *testing.T) {
inputs <- &RawRecord{ inputs <- &RawRecord{
RecordKey: k1, RecordKey: k1,
RecordMetrics: RecordMetrics{ RecordMetrics: RecordMetrics{
Bytes: 10, Packets: 1, StartMonoTimeNs: 1123, EndMonoTimeNs: 1123, Bytes: 10, Packets: 1, StartMonoTimeNs: 1123, EndMonoTimeNs: 1123, Flags: 1,
}, },
} }
inputs <- &RawRecord{ inputs <- &RawRecord{
RecordKey: k1, RecordKey: k1,
RecordMetrics: RecordMetrics{ RecordMetrics: RecordMetrics{
Bytes: 10, Packets: 1, StartMonoTimeNs: 1456, EndMonoTimeNs: 1456, Bytes: 10, Packets: 1, StartMonoTimeNs: 1456, EndMonoTimeNs: 1456, Flags: 1,
}, },
} }
...@@ -174,6 +174,7 @@ func TestEvict_Period(t *testing.T) { ...@@ -174,6 +174,7 @@ func TestEvict_Period(t *testing.T) {
Packets: 3, Packets: 3,
StartMonoTimeNs: 123, StartMonoTimeNs: 123,
EndMonoTimeNs: 789, EndMonoTimeNs: 789,
Flags: 1,
}, },
}, },
TimeFlowStart: now.Add(-1000 + 123), TimeFlowStart: now.Add(-1000 + 123),
...@@ -189,6 +190,7 @@ func TestEvict_Period(t *testing.T) { ...@@ -189,6 +190,7 @@ func TestEvict_Period(t *testing.T) {
Packets: 2, Packets: 2,
StartMonoTimeNs: 1123, StartMonoTimeNs: 1123,
EndMonoTimeNs: 1456, EndMonoTimeNs: 1456,
Flags: 1,
}, },
}, },
TimeFlowStart: now.Add(-1000 + 1123), TimeFlowStart: now.Add(-1000 + 1123),
......
...@@ -13,26 +13,26 @@ var ( ...@@ -13,26 +13,26 @@ var (
EthProtocol: 1, Direction: 1, Transport: Transport{SrcPort: 123, DstPort: 456}, EthProtocol: 1, Direction: 1, Transport: Transport{SrcPort: 123, DstPort: 456},
DataLink: DataLink{DstMac: MacAddr{0x1}, SrcMac: MacAddr{0x1}}, IFIndex: 1, DataLink: DataLink{DstMac: MacAddr{0x1}, SrcMac: MacAddr{0x1}}, IFIndex: 1,
}, RecordMetrics: RecordMetrics{ }, RecordMetrics: RecordMetrics{
Packets: 2, Bytes: 456, Packets: 2, Bytes: 456, Flags: 1,
}}, Interface: "eth0"} }}, Interface: "eth0"}
oneIf2 = &Record{RawRecord: RawRecord{RecordKey: RecordKey{ oneIf2 = &Record{RawRecord: RawRecord{RecordKey: RecordKey{
EthProtocol: 1, Direction: 1, Transport: Transport{SrcPort: 123, DstPort: 456}, EthProtocol: 1, Direction: 1, Transport: Transport{SrcPort: 123, DstPort: 456},
DataLink: DataLink{DstMac: MacAddr{0x2}, SrcMac: MacAddr{0x2}}, IFIndex: 2, DataLink: DataLink{DstMac: MacAddr{0x2}, SrcMac: MacAddr{0x2}}, IFIndex: 2,
}, RecordMetrics: RecordMetrics{ }, RecordMetrics: RecordMetrics{
Packets: 2, Bytes: 456, Packets: 2, Bytes: 456, Flags: 1,
}}, Interface: "123456789"} }}, Interface: "123456789"}
// another fow from 2 different interfaces and directions // another fow from 2 different interfaces and directions
twoIf1 = &Record{RawRecord: RawRecord{RecordKey: RecordKey{ twoIf1 = &Record{RawRecord: RawRecord{RecordKey: RecordKey{
EthProtocol: 1, Direction: 1, Transport: Transport{SrcPort: 333, DstPort: 456}, EthProtocol: 1, Direction: 1, Transport: Transport{SrcPort: 333, DstPort: 456},
DataLink: DataLink{DstMac: MacAddr{0x1}, SrcMac: MacAddr{0x1}}, IFIndex: 1, DataLink: DataLink{DstMac: MacAddr{0x1}, SrcMac: MacAddr{0x1}}, IFIndex: 1,
}, RecordMetrics: RecordMetrics{ }, RecordMetrics: RecordMetrics{
Packets: 2, Bytes: 456, Packets: 2, Bytes: 456, Flags: 1,
}}, Interface: "eth0"} }}, Interface: "eth0"}
twoIf2 = &Record{RawRecord: RawRecord{RecordKey: RecordKey{ twoIf2 = &Record{RawRecord: RawRecord{RecordKey: RecordKey{
EthProtocol: 1, Direction: 0, Transport: Transport{SrcPort: 333, DstPort: 456}, EthProtocol: 1, Direction: 0, Transport: Transport{SrcPort: 333, DstPort: 456},
DataLink: DataLink{DstMac: MacAddr{0x2}, SrcMac: MacAddr{0x2}}, IFIndex: 2, DataLink: DataLink{DstMac: MacAddr{0x2}, SrcMac: MacAddr{0x2}}, IFIndex: 2,
}, RecordMetrics: RecordMetrics{ }, RecordMetrics: RecordMetrics{
Packets: 2, Bytes: 456, Packets: 2, Bytes: 456, Flags: 1,
}}, Interface: "123456789"} }}, Interface: "123456789"}
) )
......
...@@ -65,8 +65,8 @@ type RecordMetrics struct { ...@@ -65,8 +65,8 @@ type RecordMetrics struct {
// and monotime.Now() (user space) // and monotime.Now() (user space)
StartMonoTimeNs uint64 StartMonoTimeNs uint64
EndMonoTimeNs uint64 EndMonoTimeNs uint64
Flags uint16
Errno uint8 Errno uint8
} }
// record structure as parsed from eBPF // record structure as parsed from eBPF
...@@ -124,6 +124,7 @@ func (r *RecordMetrics) Accumulate(src *RecordMetrics) { ...@@ -124,6 +124,7 @@ func (r *RecordMetrics) Accumulate(src *RecordMetrics) {
} }
r.Bytes += src.Bytes r.Bytes += src.Bytes
r.Packets += src.Packets r.Packets += src.Packets
r.Flags |= src.Flags
} }
// IP returns the net.IP equivalent object // IP returns the net.IP equivalent object
......
...@@ -26,7 +26,9 @@ func TestRecordBinaryEncoding(t *testing.T) { ...@@ -26,7 +26,9 @@ func TestRecordBinaryEncoding(t *testing.T) {
0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, // u64 bytes 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, // u64 bytes
0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, // u64 flow_start_time 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, // u64 flow_start_time
0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, // u64 flow_end_time 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, // u64 flow_end_time
0x13, 0x14, //flags
0x33, // u8 errno 0x33, // u8 errno
})) }))
require.NoError(t, err) require.NoError(t, err)
...@@ -54,6 +56,7 @@ func TestRecordBinaryEncoding(t *testing.T) { ...@@ -54,6 +56,7 @@ func TestRecordBinaryEncoding(t *testing.T) {
Bytes: 0x1a19181716151413, Bytes: 0x1a19181716151413,
StartMonoTimeNs: 0x1a19181716151413, StartMonoTimeNs: 0x1a19181716151413,
EndMonoTimeNs: 0x1a19181716151413, EndMonoTimeNs: 0x1a19181716151413,
Flags: 0x1413,
Errno: 0x33, Errno: 0x33,
}, },
}, *fr) }, *fr)
......
...@@ -14,23 +14,23 @@ func TestPacketAggregation(t *testing.T) { ...@@ -14,23 +14,23 @@ func TestPacketAggregation(t *testing.T) {
} }
tcs := []testCase{{ tcs := []testCase{{
input: []RecordMetrics{ input: []RecordMetrics{
{Packets: 0, Bytes: 0, StartMonoTimeNs: 0, EndMonoTimeNs: 0}, {Packets: 0, Bytes: 0, StartMonoTimeNs: 0, EndMonoTimeNs: 0, Flags: 1},
{Packets: 0x7, Bytes: 0x22d, StartMonoTimeNs: 0x176a790b240b, EndMonoTimeNs: 0x176a792a755b}, {Packets: 0x7, Bytes: 0x22d, StartMonoTimeNs: 0x176a790b240b, EndMonoTimeNs: 0x176a792a755b, Flags: 1},
{Packets: 0x0, Bytes: 0x0, StartMonoTimeNs: 0x0, EndMonoTimeNs: 0x0}, {Packets: 0x0, Bytes: 0x0, StartMonoTimeNs: 0x0, EndMonoTimeNs: 0x0, Flags: 1},
{Packets: 0x0, Bytes: 0x0, StartMonoTimeNs: 0x0, EndMonoTimeNs: 0x0}, {Packets: 0x0, Bytes: 0x0, StartMonoTimeNs: 0x0, EndMonoTimeNs: 0x0, Flags: 1},
}, },
expected: RecordMetrics{ expected: RecordMetrics{
Packets: 0x7, Bytes: 0x22d, StartMonoTimeNs: 0x176a790b240b, EndMonoTimeNs: 0x176a792a755b, Packets: 0x7, Bytes: 0x22d, StartMonoTimeNs: 0x176a790b240b, EndMonoTimeNs: 0x176a792a755b, Flags: 1,
}, },
}, { }, {
input: []RecordMetrics{ input: []RecordMetrics{
{Packets: 0x3, Bytes: 0x5c4, StartMonoTimeNs: 0x17f3e9613a7f, EndMonoTimeNs: 0x17f3e979816e}, {Packets: 0x3, Bytes: 0x5c4, StartMonoTimeNs: 0x17f3e9613a7f, EndMonoTimeNs: 0x17f3e979816e, Flags: 1},
{Packets: 0x2, Bytes: 0x8c, StartMonoTimeNs: 0x17f3e9633a7f, EndMonoTimeNs: 0x17f3e96f164e}, {Packets: 0x2, Bytes: 0x8c, StartMonoTimeNs: 0x17f3e9633a7f, EndMonoTimeNs: 0x17f3e96f164e, Flags: 1},
{Packets: 0x0, Bytes: 0x0, StartMonoTimeNs: 0x0, EndMonoTimeNs: 0x0}, {Packets: 0x0, Bytes: 0x0, StartMonoTimeNs: 0x0, EndMonoTimeNs: 0x0, Flags: 1},
{Packets: 0x0, Bytes: 0x0, StartMonoTimeNs: 0x0, EndMonoTimeNs: 0x0}, {Packets: 0x0, Bytes: 0x0, StartMonoTimeNs: 0x0, EndMonoTimeNs: 0x0, Flags: 1},
}, },
expected: RecordMetrics{ expected: RecordMetrics{
Packets: 0x5, Bytes: 0x5c4 + 0x8c, StartMonoTimeNs: 0x17f3e9613a7f, EndMonoTimeNs: 0x17f3e979816e, Packets: 0x5, Bytes: 0x5c4 + 0x8c, StartMonoTimeNs: 0x17f3e9613a7f, EndMonoTimeNs: 0x17f3e979816e, Flags: 1,
}, },
}} }}
ft := MapTracer{} ft := MapTracer{}
......
...@@ -29,7 +29,7 @@ func TestGRPCCommunication(t *testing.T) { ...@@ -29,7 +29,7 @@ func TestGRPCCommunication(t *testing.T) {
go func() { go func() {
_, err = client.Send(context.Background(), _, err = client.Send(context.Background(),
&pbflow.Records{Entries: []*pbflow.Record{{ &pbflow.Records{Entries: []*pbflow.Record{{
EthProtocol: 123, Bytes: 456, Network: &pbflow.Network{ EthProtocol: 123, Flags: 1, Bytes: 456, Network: &pbflow.Network{
SrcAddr: &pbflow.IP{ SrcAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x11223344}, IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x11223344},
}, },
...@@ -43,7 +43,7 @@ func TestGRPCCommunication(t *testing.T) { ...@@ -43,7 +43,7 @@ func TestGRPCCommunication(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
_, err = client.Send(context.Background(), _, err = client.Send(context.Background(),
&pbflow.Records{Entries: []*pbflow.Record{{ &pbflow.Records{Entries: []*pbflow.Record{{
EthProtocol: 789, Bytes: 101, Network: &pbflow.Network{ EthProtocol: 789, Flags: 1, Bytes: 101, Network: &pbflow.Network{
SrcAddr: &pbflow.IP{ SrcAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x44332211}, IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x44332211},
}, },
...@@ -66,6 +66,7 @@ func TestGRPCCommunication(t *testing.T) { ...@@ -66,6 +66,7 @@ func TestGRPCCommunication(t *testing.T) {
assert.Len(t, rs.Entries, 1) assert.Len(t, rs.Entries, 1)
r := rs.Entries[0] r := rs.Entries[0]
assert.EqualValues(t, 123, r.EthProtocol) assert.EqualValues(t, 123, r.EthProtocol)
assert.EqualValues(t, 1, r.Flags)
assert.EqualValues(t, 456, r.Bytes) assert.EqualValues(t, 456, r.Bytes)
assert.EqualValues(t, 0x11223344, r.GetNetwork().GetSrcAddr().GetIpv4()) assert.EqualValues(t, 0x11223344, r.GetNetwork().GetSrcAddr().GetIpv4())
assert.EqualValues(t, 0x55667788, r.GetNetwork().GetDstAddr().GetIpv4()) assert.EqualValues(t, 0x55667788, r.GetNetwork().GetDstAddr().GetIpv4())
...@@ -78,6 +79,7 @@ func TestGRPCCommunication(t *testing.T) { ...@@ -78,6 +79,7 @@ func TestGRPCCommunication(t *testing.T) {
assert.Len(t, rs.Entries, 1) assert.Len(t, rs.Entries, 1)
r = rs.Entries[0] r = rs.Entries[0]
assert.EqualValues(t, 789, r.EthProtocol) assert.EqualValues(t, 789, r.EthProtocol)
assert.EqualValues(t, 1, r.Flags)
assert.EqualValues(t, 101, r.Bytes) assert.EqualValues(t, 101, r.Bytes)
assert.EqualValues(t, 0x44332211, r.GetNetwork().GetSrcAddr().GetIpv4()) assert.EqualValues(t, 0x44332211, r.GetNetwork().GetSrcAddr().GetIpv4())
assert.EqualValues(t, uint64(0x88776655), r.GetNetwork().GetDstAddr().GetIpv4()) assert.EqualValues(t, uint64(0x88776655), r.GetNetwork().GetDstAddr().GetIpv4())
...@@ -114,7 +116,7 @@ func TestConstructorOptions(t *testing.T) { ...@@ -114,7 +116,7 @@ func TestConstructorOptions(t *testing.T) {
go func() { go func() {
_, err = client.Send(context.Background(), _, err = client.Send(context.Background(),
&pbflow.Records{Entries: []*pbflow.Record{{EthProtocol: 123, Bytes: 456}}}) &pbflow.Records{Entries: []*pbflow.Record{{EthProtocol: 123, Bytes: 456, Flags: 1}}})
require.NoError(t, err) require.NoError(t, err)
}() }()
...@@ -140,6 +142,7 @@ func BenchmarkGRPCCommunication(b *testing.B) { ...@@ -140,6 +142,7 @@ func BenchmarkGRPCCommunication(b *testing.B) {
f := &pbflow.Record{ f := &pbflow.Record{
EthProtocol: 2048, EthProtocol: 2048,
Bytes: 456, Bytes: 456,
Flags: 1,
Direction: pbflow.Direction_EGRESS, Direction: pbflow.Direction_EGRESS,
TimeFlowStart: timestamppb.Now(), TimeFlowStart: timestamppb.Now(),
TimeFlowEnd: timestamppb.Now(), TimeFlowEnd: timestamppb.Now(),
......
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.28.1 // protoc-gen-go v1.27.1
// protoc v3.19.4 // protoc v3.12.4
// source: proto/flow.proto // source: proto/flow.proto
package pbflow package pbflow
import ( import (
timestamp "github.com/golang/protobuf/ptypes/timestamp"
protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
reflect "reflect" reflect "reflect"
sync "sync" sync "sync"
) )
...@@ -162,10 +162,10 @@ type Record struct { ...@@ -162,10 +162,10 @@ type Record struct {
// protocol as defined by ETH_P_* in linux/if_ether.h // protocol as defined by ETH_P_* in linux/if_ether.h
// https://github.com/torvalds/linux/blob/master/include/uapi/linux/if_ether.h // https://github.com/torvalds/linux/blob/master/include/uapi/linux/if_ether.h
EthProtocol uint32 `protobuf:"varint,1,opt,name=eth_protocol,json=ethProtocol,proto3" json:"eth_protocol,omitempty"` EthProtocol uint32 `protobuf:"varint,1,opt,name=eth_protocol,json=ethProtocol,proto3" json:"eth_protocol,omitempty"`
Direction Direction `protobuf:"varint,2,opt,name=direction,proto3,enum=pbflow.Direction" json:"direction,omitempty"` Direction Direction `protobuf:"varint,2,opt,name=direction,proto3,enum=pbflow.Direction" json:"direction,omitempty"`
TimeFlowStart *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=time_flow_start,json=timeFlowStart,proto3" json:"time_flow_start,omitempty"` TimeFlowStart *timestamp.Timestamp `protobuf:"bytes,3,opt,name=time_flow_start,json=timeFlowStart,proto3" json:"time_flow_start,omitempty"`
TimeFlowEnd *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=time_flow_end,json=timeFlowEnd,proto3" json:"time_flow_end,omitempty"` TimeFlowEnd *timestamp.Timestamp `protobuf:"bytes,4,opt,name=time_flow_end,json=timeFlowEnd,proto3" json:"time_flow_end,omitempty"`
// OSI-layer attributes // OSI-layer attributes
DataLink *DataLink `protobuf:"bytes,5,opt,name=data_link,json=dataLink,proto3" json:"data_link,omitempty"` DataLink *DataLink `protobuf:"bytes,5,opt,name=data_link,json=dataLink,proto3" json:"data_link,omitempty"`
Network *Network `protobuf:"bytes,6,opt,name=network,proto3" json:"network,omitempty"` Network *Network `protobuf:"bytes,6,opt,name=network,proto3" json:"network,omitempty"`
...@@ -177,7 +177,8 @@ type Record struct { ...@@ -177,7 +177,8 @@ type Record struct {
// From all the duplicate flows, one will set this value to false and the rest will be true. // From all the duplicate flows, one will set this value to false and the rest will be true.
Duplicate bool `protobuf:"varint,11,opt,name=duplicate,proto3" json:"duplicate,omitempty"` Duplicate bool `protobuf:"varint,11,opt,name=duplicate,proto3" json:"duplicate,omitempty"`
// Agent IP address to help identifying the source of the flow // Agent IP address to help identifying the source of the flow
AgentIp *IP `protobuf:"bytes,12,opt,name=agent_ip,json=agentIp,proto3" json:"agent_ip,omitempty"` AgentIp *IP `protobuf:"bytes,12,opt,name=agent_ip,json=agentIp,proto3" json:"agent_ip,omitempty"`
Flags uint32 `protobuf:"varint,13,opt,name=flags,proto3" json:"flags,omitempty"`
} }
func (x *Record) Reset() { func (x *Record) Reset() {
...@@ -226,14 +227,14 @@ func (x *Record) GetDirection() Direction { ...@@ -226,14 +227,14 @@ func (x *Record) GetDirection() Direction {
return Direction_INGRESS return Direction_INGRESS
} }
func (x *Record) GetTimeFlowStart() *timestamppb.Timestamp { func (x *Record) GetTimeFlowStart() *timestamp.Timestamp {
if x != nil { if x != nil {
return x.TimeFlowStart return x.TimeFlowStart
} }
return nil return nil
} }
func (x *Record) GetTimeFlowEnd() *timestamppb.Timestamp { func (x *Record) GetTimeFlowEnd() *timestamp.Timestamp {
if x != nil { if x != nil {
return x.TimeFlowEnd return x.TimeFlowEnd
} }
...@@ -296,6 +297,13 @@ func (x *Record) GetAgentIp() *IP { ...@@ -296,6 +297,13 @@ func (x *Record) GetAgentIp() *IP {
return nil return nil
} }
func (x *Record) GetFlags() uint32 {
if x != nil {
return x.Flags
}
return 0
}
type DataLink struct { type DataLink struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
...@@ -562,7 +570,7 @@ var file_proto_flow_proto_rawDesc = []byte{ ...@@ -562,7 +570,7 @@ var file_proto_flow_proto_rawDesc = []byte{
0x07, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x12, 0x28, 0x0a, 0x07, 0x65, 0x6e, 0x74, 0x72, 0x07, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x12, 0x28, 0x0a, 0x07, 0x65, 0x6e, 0x74, 0x72,
0x69, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x69, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x70, 0x62, 0x66, 0x6c,
0x6f, 0x77, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x07, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x6f, 0x77, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x07, 0x65, 0x6e, 0x74, 0x72, 0x69,
0x65, 0x73, 0x22, 0xfe, 0x03, 0x0a, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x21, 0x0a, 0x65, 0x73, 0x22, 0x94, 0x04, 0x0a, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x21, 0x0a,
0x0c, 0x65, 0x74, 0x68, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x01, 0x20, 0x0c, 0x65, 0x74, 0x68, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x01, 0x20,
0x01, 0x28, 0x0d, 0x52, 0x0b, 0x65, 0x74, 0x68, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x01, 0x28, 0x0d, 0x52, 0x0b, 0x65, 0x74, 0x68, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c,
0x12, 0x2f, 0x0a, 0x09, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x12, 0x2f, 0x0a, 0x09, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20,
...@@ -594,34 +602,35 @@ var file_proto_flow_proto_rawDesc = []byte{ ...@@ -594,34 +602,35 @@ var file_proto_flow_proto_rawDesc = []byte{
0x08, 0x52, 0x09, 0x64, 0x75, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x08, 0x08, 0x52, 0x09, 0x64, 0x75, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x08,
0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x70, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x70, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a,
0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, 0x50, 0x52, 0x07, 0x61, 0x67, 0x65, 0x6e, 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, 0x50, 0x52, 0x07, 0x61, 0x67, 0x65, 0x6e,
0x74, 0x49, 0x70, 0x22, 0x3c, 0x0a, 0x08, 0x44, 0x61, 0x74, 0x61, 0x4c, 0x69, 0x6e, 0x6b, 0x12, 0x74, 0x49, 0x70, 0x12, 0x14, 0x0a, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x18, 0x0d, 0x20, 0x01,
0x17, 0x0a, 0x07, 0x73, 0x72, 0x63, 0x5f, 0x6d, 0x61, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x28, 0x0d, 0x52, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x22, 0x3c, 0x0a, 0x08, 0x44, 0x61, 0x74,
0x52, 0x06, 0x73, 0x72, 0x63, 0x4d, 0x61, 0x63, 0x12, 0x17, 0x0a, 0x07, 0x64, 0x73, 0x74, 0x5f, 0x61, 0x4c, 0x69, 0x6e, 0x6b, 0x12, 0x17, 0x0a, 0x07, 0x73, 0x72, 0x63, 0x5f, 0x6d, 0x61, 0x63,
0x6d, 0x61, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x64, 0x73, 0x74, 0x4d, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x73, 0x72, 0x63, 0x4d, 0x61, 0x63, 0x12, 0x17,
0x63, 0x22, 0x57, 0x0a, 0x07, 0x4e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x12, 0x25, 0x0a, 0x08, 0x0a, 0x07, 0x64, 0x73, 0x74, 0x5f, 0x6d, 0x61, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52,
0x73, 0x72, 0x63, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x06, 0x64, 0x73, 0x74, 0x4d, 0x61, 0x63, 0x22, 0x57, 0x0a, 0x07, 0x4e, 0x65, 0x74, 0x77, 0x6f,
0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, 0x50, 0x52, 0x07, 0x73, 0x72, 0x63, 0x41, 0x72, 0x6b, 0x12, 0x25, 0x0a, 0x08, 0x73, 0x72, 0x63, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x01,
0x64, 0x64, 0x72, 0x12, 0x25, 0x0a, 0x08, 0x64, 0x73, 0x74, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, 0x50,
0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, 0x52, 0x07, 0x73, 0x72, 0x63, 0x41, 0x64, 0x64, 0x72, 0x12, 0x25, 0x0a, 0x08, 0x64, 0x73, 0x74,
0x50, 0x52, 0x07, 0x64, 0x73, 0x74, 0x41, 0x64, 0x64, 0x72, 0x22, 0x3d, 0x0a, 0x02, 0x49, 0x50, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x70, 0x62,
0x12, 0x14, 0x0a, 0x04, 0x69, 0x70, 0x76, 0x34, 0x18, 0x01, 0x20, 0x01, 0x28, 0x07, 0x48, 0x00, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, 0x50, 0x52, 0x07, 0x64, 0x73, 0x74, 0x41, 0x64, 0x64, 0x72,
0x52, 0x04, 0x69, 0x70, 0x76, 0x34, 0x12, 0x14, 0x0a, 0x04, 0x69, 0x70, 0x76, 0x36, 0x18, 0x02, 0x22, 0x3d, 0x0a, 0x02, 0x49, 0x50, 0x12, 0x14, 0x0a, 0x04, 0x69, 0x70, 0x76, 0x34, 0x18, 0x01,
0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x04, 0x69, 0x70, 0x76, 0x36, 0x42, 0x0b, 0x0a, 0x09, 0x20, 0x01, 0x28, 0x07, 0x48, 0x00, 0x52, 0x04, 0x69, 0x70, 0x76, 0x34, 0x12, 0x14, 0x0a, 0x04,
0x69, 0x70, 0x5f, 0x66, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x22, 0x5d, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x69, 0x70, 0x76, 0x36, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x04, 0x69, 0x70,
0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x73, 0x72, 0x63, 0x5f, 0x70, 0x6f, 0x76, 0x36, 0x42, 0x0b, 0x0a, 0x09, 0x69, 0x70, 0x5f, 0x66, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x22,
0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x73, 0x72, 0x63, 0x50, 0x6f, 0x72, 0x5d, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x19, 0x0a, 0x08,
0x74, 0x12, 0x19, 0x0a, 0x08, 0x64, 0x73, 0x74, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x73, 0x72, 0x63, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07,
0x01, 0x28, 0x0d, 0x52, 0x07, 0x64, 0x73, 0x74, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x72, 0x63, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x64, 0x73, 0x74, 0x5f, 0x70,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x64, 0x73, 0x74, 0x50, 0x6f,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2a, 0x24, 0x0a, 0x09, 0x44, 0x69, 0x72, 0x65, 0x72, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x03,
0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0b, 0x0a, 0x07, 0x49, 0x4e, 0x47, 0x52, 0x45, 0x53, 0x53, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2a, 0x24,
0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x45, 0x47, 0x52, 0x45, 0x53, 0x53, 0x10, 0x01, 0x32, 0x3e, 0x0a, 0x09, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0b, 0x0a, 0x07, 0x49,
0x0a, 0x09, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x12, 0x31, 0x0a, 0x04, 0x53, 0x4e, 0x47, 0x52, 0x45, 0x53, 0x53, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x45, 0x47, 0x52, 0x45,
0x65, 0x6e, 0x64, 0x12, 0x0f, 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x52, 0x65, 0x63, 0x53, 0x53, 0x10, 0x01, 0x32, 0x3e, 0x0a, 0x09, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f,
0x6f, 0x72, 0x64, 0x73, 0x1a, 0x16, 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x43, 0x6f, 0x72, 0x12, 0x31, 0x0a, 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x0f, 0x2e, 0x70, 0x62, 0x66, 0x6c,
0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x0a, 0x6f, 0x77, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x1a, 0x16, 0x2e, 0x70, 0x62, 0x66,
0x5a, 0x08, 0x2e, 0x2f, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6c, 0x6f, 0x77, 0x2e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x70,
0x6f, 0x33, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77,
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (
...@@ -639,15 +648,15 @@ func file_proto_flow_proto_rawDescGZIP() []byte { ...@@ -639,15 +648,15 @@ func file_proto_flow_proto_rawDescGZIP() []byte {
var file_proto_flow_proto_enumTypes = make([]protoimpl.EnumInfo, 1) var file_proto_flow_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_proto_flow_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_proto_flow_proto_msgTypes = make([]protoimpl.MessageInfo, 7)
var file_proto_flow_proto_goTypes = []interface{}{ var file_proto_flow_proto_goTypes = []interface{}{
(Direction)(0), // 0: pbflow.Direction (Direction)(0), // 0: pbflow.Direction
(*CollectorReply)(nil), // 1: pbflow.CollectorReply (*CollectorReply)(nil), // 1: pbflow.CollectorReply
(*Records)(nil), // 2: pbflow.Records (*Records)(nil), // 2: pbflow.Records
(*Record)(nil), // 3: pbflow.Record (*Record)(nil), // 3: pbflow.Record
(*DataLink)(nil), // 4: pbflow.DataLink (*DataLink)(nil), // 4: pbflow.DataLink
(*Network)(nil), // 5: pbflow.Network (*Network)(nil), // 5: pbflow.Network
(*IP)(nil), // 6: pbflow.IP (*IP)(nil), // 6: pbflow.IP
(*Transport)(nil), // 7: pbflow.Transport (*Transport)(nil), // 7: pbflow.Transport
(*timestamppb.Timestamp)(nil), // 8: google.protobuf.Timestamp (*timestamp.Timestamp)(nil), // 8: google.protobuf.Timestamp
} }
var file_proto_flow_proto_depIdxs = []int32{ var file_proto_flow_proto_depIdxs = []int32{
3, // 0: pbflow.Records.entries:type_name -> pbflow.Record 3, // 0: pbflow.Records.entries:type_name -> pbflow.Record
......
// Code generated by protoc-gen-go-grpc. DO NOT EDIT. // Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions: // versions:
// - protoc-gen-go-grpc v1.2.0 // - protoc-gen-go-grpc v1.2.0
// - protoc v3.19.4 // - protoc v3.12.4
// source: proto/flow.proto // source: proto/flow.proto
package pbflow package pbflow
......
...@@ -40,6 +40,7 @@ message Record { ...@@ -40,6 +40,7 @@ message Record {
// Agent IP address to help identifying the source of the flow // Agent IP address to help identifying the source of the flow
IP agent_ip = 12; IP agent_ip = 12;
uint32 flags = 13;
} }
message DataLink { message DataLink {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment