diff --git a/bpf/.clang-format b/bpf/.clang-format new file mode 100644 index 0000000000000000000000000000000000000000..92af251e7c9d4b332bd5e3ae2ed4f4cd1ec78f8d --- /dev/null +++ b/bpf/.clang-format @@ -0,0 +1,9 @@ +{ + BasedOnStyle: LLVM, + AllowShortFunctionsOnASingleLine: InlineOnly, + ColumnLimit: 100, + IndentWidth: 4, + SortIncludes: false, + ReflowComments: false, + TabWidth: 4, +} diff --git a/bpf/flow.h b/bpf/flow.h index 1248838bfad0911b1f15f1ccfe0082edba3c72bb..af668b5ecf71a5f442ba0fa99f31a390128684aa 100644 --- a/bpf/flow.h +++ b/bpf/flow.h @@ -17,6 +17,8 @@ typedef struct flow_metrics_t { // as output from bpf_ktime_get_ns() u64 start_mono_time_ts; u64 end_mono_time_ts; + // TCP Flags from https://www.ietf.org/rfc/rfc793.txt + u16 flags; // The positive errno of a failed map insertion that caused a flow // to be sent via ringbuffer. // 0 otherwise diff --git a/bpf/flows.c b/bpf/flows.c index 790f24b8a0c4cd93c4f954b4b10843f2794c1714..72fa02aeecf1e25f89872f1c4b6f838dc4521ddb 100644 --- a/bpf/flows.c +++ b/bpf/flows.c @@ -26,7 +26,6 @@ #include <linux/udp.h> #include <linux/tcp.h> #include <string.h> - #include <stdbool.h> #include <linux/if_ether.h> @@ -42,6 +41,20 @@ #define INGRESS 0 #define EGRESS 1 +// Flags according to RFC 9293 & https://www.iana.org/assignments/ipfix/ipfix.xhtml +#define FIN_FLAG 0x01 +#define SYN_FLAG 0x02 +#define RST_FLAG 0x04 +#define PSH_FLAG 0x08 +#define ACK_FLAG 0x10 +#define URG_FLAG 0x20 +#define ECE_FLAG 0x40 +#define CWR_FLAG 0x80 +// Custom flags exported +#define SYN_ACK_FLAG 0x100 +#define FIN_ACK_FLAG 0x200 +#define RST_ACK_FLAG 0x400 + // Common Ringbuffer as a conduit for ingress/egress flows to userspace struct { __uint(type, BPF_MAP_TYPE_RINGBUF); @@ -62,8 +75,35 @@ volatile const u8 trace_messages = 0; const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}; +// sets the TCP header flags for connection information +static inline void set_flags(struct tcphdr *th, u16 *flags) { + //If both ACK and SYN are set, then it is server -> client communication during 3-way handshake. + if (th->ack && th->syn) { + *flags |= SYN_ACK_FLAG; + } else if (th->ack && th->fin ) { + // If both ACK and FIN are set, then it is graceful termination from server. + *flags |= FIN_ACK_FLAG; + } else if (th->ack && th->rst ) { + // If both ACK and RST are set, then it is abrupt connection termination. + *flags |= RST_ACK_FLAG; + } else if (th->fin) { + *flags |= FIN_FLAG; + } else if (th->syn) { + *flags |= SYN_FLAG; + } else if (th->rst) { + *flags |= RST_FLAG; + } else if (th->psh) { + *flags |= PSH_FLAG; + } else if (th->urg) { + *flags |= URG_FLAG; + } else if (th->ece) { + *flags |= ECE_FLAG; + } else if (th->cwr) { + *flags |= CWR_FLAG; + } +} // sets flow fields from IPv4 header information -static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id) { +static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id, u16 *flags) { if ((void *)ip + sizeof(*ip) > data_end) { return DISCARD; } @@ -81,6 +121,7 @@ static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id) { if ((void *)tcp + sizeof(*tcp) <= data_end) { id->src_port = __bpf_ntohs(tcp->source); id->dst_port = __bpf_ntohs(tcp->dest); + set_flags(tcp, flags); } } break; case IPPROTO_UDP: { @@ -97,7 +138,7 @@ static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id) { } // sets flow fields from IPv6 header information -static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, flow_id *id) { +static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, flow_id *id, u16 *flags) { if ((void *)ip + sizeof(*ip) > data_end) { return DISCARD; } @@ -113,6 +154,7 @@ static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, flow_id *id) { if ((void *)tcp + sizeof(*tcp) <= data_end) { id->src_port = __bpf_ntohs(tcp->source); id->dst_port = __bpf_ntohs(tcp->dest); + set_flags(tcp, flags); } } break; case IPPROTO_UDP: { @@ -128,7 +170,7 @@ static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, flow_id *id) { return SUBMIT; } // sets flow fields from Ethernet header information -static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, flow_id *id) { +static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, flow_id *id, u16 *flags) { if ((void *)eth + sizeof(*eth) > data_end) { return DISCARD; } @@ -138,15 +180,15 @@ static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, flow_id *id) { if (id->eth_protocol == ETH_P_IP) { struct iphdr *ip = (void *)eth + sizeof(*eth); - return fill_iphdr(ip, data_end, id); + return fill_iphdr(ip, data_end, id, flags); } else if (id->eth_protocol == ETH_P_IPV6) { struct ipv6hdr *ip6 = (void *)eth + sizeof(*eth); - return fill_ip6hdr(ip6, data_end, id); + return fill_ip6hdr(ip6, data_end, id, flags); } else { // TODO : Need to implement other specific ethertypes if needed // For now other parts of flow id remain zero - memset (&(id->src_ip),0, sizeof(struct in6_addr)); - memset (&(id->dst_ip),0, sizeof(struct in6_addr)); + memset(&(id->src_ip), 0, sizeof(struct in6_addr)); + memset(&(id->dst_ip), 0, sizeof(struct in6_addr)); id->transport_protocol = 0; id->src_port = 0; id->dst_port = 0; @@ -154,7 +196,6 @@ static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, flow_id *id) { return SUBMIT; } - static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { // If sampling is defined, will only parse 1 out of "sampling" flows if (sampling != 0 && (bpf_get_prandom_u32() % sampling) != 0) { @@ -166,7 +207,8 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { flow_id id; u64 current_time = bpf_ktime_get_ns(); struct ethhdr *eth = data; - if (fill_ethhdr(eth, data_end, &id) == DISCARD) { + u16 flags = 0; + if (fill_ethhdr(eth, data_end, &id, &flags) == DISCARD) { return TC_ACT_OK; } id.if_index = skb->ifindex; @@ -184,7 +226,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { if (aggregate_flow->start_mono_time_ts == 0) { aggregate_flow->start_mono_time_ts = current_time; } - + aggregate_flow->flags |= flags; long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY); if (trace_messages && ret != 0) { // usually error -16 (-EBUSY) is printed here. @@ -198,9 +240,10 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { // Key does not exist in the map, and will need to create a new entry. flow_metrics new_flow = { .packets = 1, - .bytes=skb->len, + .bytes = skb->len, .start_mono_time_ts = current_time, .end_mono_time_ts = current_time, + .flags = flags, }; // even if we know that the entry is new, another CPU might be concurrently inserting a flow @@ -230,15 +273,14 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { } } return TC_ACT_OK; - } SEC("tc_ingress") -int ingress_flow_parse (struct __sk_buff *skb) { +int ingress_flow_parse(struct __sk_buff *skb) { return flow_monitor(skb, INGRESS); } SEC("tc_egress") -int egress_flow_parse (struct __sk_buff *skb) { +int egress_flow_parse(struct __sk_buff *skb) { return flow_monitor(skb, EGRESS); } char _license[] SEC("license") = "GPL"; diff --git a/examples/flowlogs-dump/server/flowlogs-dump-collector.go b/examples/flowlogs-dump/server/flowlogs-dump-collector.go index 26aff7fc97466a4c2300a603c9edbe9d445689a7..3d2c8ab96c85ad139cda0157ce55e920cf109634 100644 --- a/examples/flowlogs-dump/server/flowlogs-dump-collector.go +++ b/examples/flowlogs-dump/server/flowlogs-dump-collector.go @@ -61,7 +61,7 @@ func main() { log.SetFlags(0) flag.Parse() - receivedRecords := make(chan *pbflow.Records, 100) + receivedRecords := make(chan *pbflow.Records, 1000) log.Println("starting flowlogs-dump-collector on port", *port) go func() { _, err := grpc.StartCollector(*port, receivedRecords) @@ -72,7 +72,7 @@ func main() { for records := range receivedRecords { for _, record := range records.Entries { if record.EthProtocol == ipv6 { - log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s dir:%d bytes:%d packets:%d ends: %v\n", + log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s dir:%d bytes:%d packets:%d flags:%d ends: %v\n", ipProto[record.EthProtocol], record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"), record.Interface, @@ -84,10 +84,11 @@ func main() { record.Direction, record.Bytes, record.Packets, + record.Flags, record.TimeFlowEnd.AsTime().Local().Format("15:04:05.000000"), ) } else { - log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s dir:%d bytes:%d packets:%d ends: %v\n", + log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s dir:%d bytes:%d packets:%d flags:%d ends: %v\n", ipProto[record.EthProtocol], record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"), record.Interface, @@ -99,6 +100,7 @@ func main() { record.Direction, record.Bytes, record.Packets, + record.Flags, record.TimeFlowEnd.AsTime().Local().Format("15:04:05.000000"), ) } diff --git a/go.mod b/go.mod index 4df6d82abf5dfe9014420d8b34b19949da0ce4df..b06f7bfd38e0f15c7d3bb85336a7c6b63a785e2c 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/caarlos0/env/v6 v6.9.1 github.com/cilium/ebpf v0.8.1 github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424 + github.com/golang/protobuf v1.5.2 github.com/mariomac/guara v0.0.0-20220523124851-5fc279816f1f github.com/netobserv/gopipes v0.3.0 github.com/paulbellamy/ratecounter v0.2.0 @@ -35,7 +36,6 @@ require ( github.com/go-openapi/jsonreference v0.19.5 // indirect github.com/go-openapi/swag v0.19.14 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/protobuf v1.5.2 // indirect github.com/google/gnostic v0.5.7-v3refs // indirect github.com/google/go-cmp v0.5.7 // indirect github.com/google/gofuzz v1.1.0 // indirect diff --git a/pkg/ebpf/bpf_bpfeb.o b/pkg/ebpf/bpf_bpfeb.o index ee6465770f750f986efe7ab54426cac2f1b26477..8f980bfe43902ca98206d83f9e1c41e70ad28c29 100644 Binary files a/pkg/ebpf/bpf_bpfeb.o and b/pkg/ebpf/bpf_bpfeb.o differ diff --git a/pkg/ebpf/bpf_bpfel.o b/pkg/ebpf/bpf_bpfel.o index b28228617135f0ee03e3b7426a289ba71b98b12f..300e3452ad3b4b963cdf91ba9e7550ec1b03328e 100644 Binary files a/pkg/ebpf/bpf_bpfel.o and b/pkg/ebpf/bpf_bpfel.o differ diff --git a/pkg/exporter/ipfix.go b/pkg/exporter/ipfix.go index c6c33e6db26849cfeda3a02e935505561adf2325..ad059a73bcf44db3ee6cda162d55329f3ac305a9 100644 --- a/pkg/exporter/ipfix.go +++ b/pkg/exporter/ipfix.go @@ -39,6 +39,42 @@ func addElementToTemplate(log *logrus.Entry, elementName string, value []byte, e return nil } +func AddRecordValuesToTemplate(log *logrus.Entry, elements *[]entities.InfoElementWithValue) error { + err := addElementToTemplate(log, "octetDeltaCount", nil, elements) + if err != nil { + return err + } + err = addElementToTemplate(log, "tcpControlBits", nil, elements) + if err != nil { + return err + } + err = addElementToTemplate(log, "flowStartSeconds", nil, elements) + if err != nil { + return err + } + err = addElementToTemplate(log, "flowStartMilliseconds", nil, elements) + if err != nil { + return err + } + err = addElementToTemplate(log, "flowEndSeconds", nil, elements) + if err != nil { + return err + } + err = addElementToTemplate(log, "flowEndMilliseconds", nil, elements) + if err != nil { + return err + } + err = addElementToTemplate(log, "packetDeltaCount", nil, elements) + if err != nil { + return err + } + err = addElementToTemplate(log, "interfaceName", nil, elements) + if err != nil { + return err + } + return nil +} + func SendTemplateRecordv4(log *logrus.Entry, exporter *ipfixExporter.ExportingProcess) (uint16, []entities.InfoElementWithValue, error) { templateID := exporter.NewTemplateID() templateSet := entities.NewSet(false) @@ -84,35 +120,10 @@ func SendTemplateRecordv4(log *logrus.Entry, exporter *ipfixExporter.ExportingPr if err != nil { return 0, nil, err } - err = addElementToTemplate(log, "octetDeltaCount", nil, &elements) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate(log, "flowStartSeconds", nil, &elements) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate(log, "flowStartMilliseconds", nil, &elements) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate(log, "flowEndSeconds", nil, &elements) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate(log, "flowEndMilliseconds", nil, &elements) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate(log, "packetDeltaCount", nil, &elements) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate(log, "interfaceName", nil, &elements) + err = AddRecordValuesToTemplate(log, &elements) if err != nil { return 0, nil, err } - err = templateSet.AddRecord(elements, templateID) if err != nil { return 0, nil, err @@ -170,31 +181,7 @@ func SendTemplateRecordv6(log *logrus.Entry, exporter *ipfixExporter.ExportingPr if err != nil { return 0, nil, err } - err = addElementToTemplate(log, "octetDeltaCount", nil, &elements) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate(log, "flowStartSeconds", nil, &elements) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate(log, "flowStartMilliseconds", nil, &elements) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate(log, "flowEndSeconds", nil, &elements) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate(log, "flowEndMilliseconds", nil, &elements) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate(log, "packetDeltaCount", nil, &elements) - if err != nil { - return 0, nil, err - } - err = addElementToTemplate(log, "interfaceName", nil, &elements) + err = AddRecordValuesToTemplate(log, &elements) if err != nil { return 0, nil, err } @@ -262,6 +249,27 @@ func setIPv4Address(ieValPtr *entities.InfoElementWithValue, ipAddress net.IP) { ieVal.SetIPAddressValue(ipAddress) } } +func setIERecordValue(record *flow.Record, ieValPtr *entities.InfoElementWithValue) { + ieVal := *ieValPtr + switch ieVal.GetName() { + case "octetDeltaCount": + ieVal.SetUnsigned64Value(record.Bytes) + case "tcpControlBits": + ieVal.SetUnsigned16Value(record.Flags) + case "flowStartSeconds": + ieVal.SetUnsigned32Value(uint32(record.TimeFlowStart.Unix())) + case "flowStartMilliseconds": + ieVal.SetUnsigned64Value(uint64(record.TimeFlowStart.UnixMilli())) + case "flowEndSeconds": + ieVal.SetUnsigned32Value(uint32(record.TimeFlowEnd.Unix())) + case "flowEndMilliseconds": + ieVal.SetUnsigned64Value(uint64(record.TimeFlowEnd.UnixMilli())) + case "packetDeltaCount": + ieVal.SetUnsigned64Value(uint64(record.Packets)) + case "interfaceName": + ieVal.SetStringValue(record.Interface) + } +} func setIEValue(record *flow.Record, ieValPtr *entities.InfoElementWithValue) { ieVal := *ieValPtr switch ieVal.GetName() { @@ -289,26 +297,12 @@ func setIEValue(record *flow.Record, ieValPtr *entities.InfoElementWithValue) { ieVal.SetUnsigned16Value(record.Transport.SrcPort) case "destinationTransportPort": ieVal.SetUnsigned16Value(record.Transport.DstPort) - case "octetDeltaCount": - ieVal.SetUnsigned64Value(record.Bytes) - case "flowStartSeconds": - ieVal.SetUnsigned32Value(uint32(record.TimeFlowStart.Unix())) - case "flowStartMilliseconds": - ieVal.SetUnsigned64Value(uint64(record.TimeFlowStart.UnixMilli())) - case "flowEndSeconds": - ieVal.SetUnsigned32Value(uint32(record.TimeFlowEnd.Unix())) - case "flowEndMilliseconds": - ieVal.SetUnsigned64Value(uint64(record.TimeFlowEnd.UnixMilli())) - case "packetDeltaCount": - ieVal.SetUnsigned64Value(uint64(record.Packets)) - case "interfaceName": - ieVal.SetStringValue(record.Interface) } - } func setEntities(record *flow.Record, elements *[]entities.InfoElementWithValue) { for _, ieVal := range *elements { setIEValue(record, &ieVal) + setIERecordValue(record, &ieVal) } } func (ipf *IPFIX) sendDataRecord(log *logrus.Entry, record *flow.Record, v6 bool) error { diff --git a/pkg/exporter/kafka_proto_test.go b/pkg/exporter/kafka_proto_test.go index 0093df1b255d004cb8e330af929eb32ba295db07..485b4c24b9f75361428d2fb4a45d9f560e577002 100644 --- a/pkg/exporter/kafka_proto_test.go +++ b/pkg/exporter/kafka_proto_test.go @@ -24,7 +24,7 @@ func IPAddrFromNetIP(netIP net.IP) flow.IPAddr { func TestProtoConversion(t *testing.T) { wc := writerCapturer{} kj := KafkaProto{Writer: &wc} - input := make(chan []*flow.Record, 10) + input := make(chan []*flow.Record, 11) record := flow.Record{} record.EthProtocol = 3 record.Direction = 1 @@ -39,6 +39,7 @@ func TestProtoConversion(t *testing.T) { record.TimeFlowEnd = time.Now() record.Bytes = 789 record.Packets = 987 + record.Flags = uint16(1) record.Interface = "veth0" input <- []*flow.Record{&record} @@ -61,6 +62,7 @@ func TestProtoConversion(t *testing.T) { assert.Equal(t, record.TimeFlowEnd.UnixMilli(), r.TimeFlowEnd.AsTime().UnixMilli()) assert.EqualValues(t, 789, r.Bytes) assert.EqualValues(t, 987, r.Packets) + assert.EqualValues(t, uint16(1), r.Flags) assert.Equal(t, "veth0", r.Interface) } diff --git a/pkg/exporter/proto.go b/pkg/exporter/proto.go index a47048b9068016133c57647339f7fdce733674ec..f63743412aedc604f6d63198f8310cb689125832 100644 --- a/pkg/exporter/proto.go +++ b/pkg/exporter/proto.go @@ -64,9 +64,10 @@ func v4FlowToPB(fr *flow.Record) *pbflow.Record { Nanos: int32(fr.TimeFlowEnd.Nanosecond()), }, Packets: uint64(fr.Packets), - Interface: fr.Interface, Duplicate: fr.Duplicate, AgentIp: agentIP(fr.AgentIP), + Flags: uint32(fr.Flags), + Interface: string(fr.Interface), } } @@ -97,6 +98,7 @@ func v6FlowToPB(fr *flow.Record) *pbflow.Record { Nanos: int32(fr.TimeFlowEnd.Nanosecond()), }, Packets: uint64(fr.Packets), + Flags: uint32(fr.Flags), Interface: fr.Interface, Duplicate: fr.Duplicate, AgentIp: agentIP(fr.AgentIP), diff --git a/pkg/flow/account_test.go b/pkg/flow/account_test.go index 2bf2bf0485f3d77a6987e22c0bbecd72ea4bdb8d..2e30ca7930c126fd3de3a0e8fab54ca7733dcff1 100644 --- a/pkg/flow/account_test.go +++ b/pkg/flow/account_test.go @@ -55,19 +55,19 @@ func TestEvict_MaxEntries(t *testing.T) { inputs <- &RawRecord{ RecordKey: k1, RecordMetrics: RecordMetrics{ - Bytes: 123, Packets: 1, StartMonoTimeNs: 123, EndMonoTimeNs: 123, + Bytes: 123, Packets: 1, StartMonoTimeNs: 123, EndMonoTimeNs: 123, Flags: 1, }, } inputs <- &RawRecord{ RecordKey: k2, RecordMetrics: RecordMetrics{ - Bytes: 456, Packets: 1, StartMonoTimeNs: 456, EndMonoTimeNs: 456, + Bytes: 456, Packets: 1, StartMonoTimeNs: 456, EndMonoTimeNs: 456, Flags: 1, }, } inputs <- &RawRecord{ RecordKey: k1, RecordMetrics: RecordMetrics{ - Bytes: 321, Packets: 1, StartMonoTimeNs: 789, EndMonoTimeNs: 789, + Bytes: 321, Packets: 1, StartMonoTimeNs: 789, EndMonoTimeNs: 789, Flags: 1, }, } requireNoEviction(t, evictor) @@ -76,7 +76,7 @@ func TestEvict_MaxEntries(t *testing.T) { inputs <- &RawRecord{ RecordKey: k3, RecordMetrics: RecordMetrics{ - Bytes: 111, Packets: 1, StartMonoTimeNs: 888, EndMonoTimeNs: 888, + Bytes: 111, Packets: 1, StartMonoTimeNs: 888, EndMonoTimeNs: 888, Flags: 1, }, } @@ -96,7 +96,7 @@ func TestEvict_MaxEntries(t *testing.T) { RawRecord: RawRecord{ RecordKey: k1, RecordMetrics: RecordMetrics{ - Bytes: 444, Packets: 2, StartMonoTimeNs: 123, EndMonoTimeNs: 789, + Bytes: 444, Packets: 2, StartMonoTimeNs: 123, EndMonoTimeNs: 789, Flags: 1, }, }, TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond), @@ -106,7 +106,7 @@ func TestEvict_MaxEntries(t *testing.T) { RawRecord: RawRecord{ RecordKey: k2, RecordMetrics: RecordMetrics{ - Bytes: 456, Packets: 1, StartMonoTimeNs: 456, EndMonoTimeNs: 456, + Bytes: 456, Packets: 1, StartMonoTimeNs: 456, EndMonoTimeNs: 456, Flags: 1, }, }, TimeFlowStart: now.Add(-(1000 - 456) * time.Nanosecond), @@ -132,19 +132,19 @@ func TestEvict_Period(t *testing.T) { inputs <- &RawRecord{ RecordKey: k1, RecordMetrics: RecordMetrics{ - Bytes: 10, Packets: 1, StartMonoTimeNs: 123, EndMonoTimeNs: 123, + Bytes: 10, Packets: 1, StartMonoTimeNs: 123, EndMonoTimeNs: 123, Flags: 1, }, } inputs <- &RawRecord{ RecordKey: k1, RecordMetrics: RecordMetrics{ - Bytes: 10, Packets: 1, StartMonoTimeNs: 456, EndMonoTimeNs: 456, + Bytes: 10, Packets: 1, StartMonoTimeNs: 456, EndMonoTimeNs: 456, Flags: 1, }, } inputs <- &RawRecord{ RecordKey: k1, RecordMetrics: RecordMetrics{ - Bytes: 10, Packets: 1, StartMonoTimeNs: 789, EndMonoTimeNs: 789, + Bytes: 10, Packets: 1, StartMonoTimeNs: 789, EndMonoTimeNs: 789, Flags: 1, }, } // Forcing at least one eviction here @@ -152,13 +152,13 @@ func TestEvict_Period(t *testing.T) { inputs <- &RawRecord{ RecordKey: k1, RecordMetrics: RecordMetrics{ - Bytes: 10, Packets: 1, StartMonoTimeNs: 1123, EndMonoTimeNs: 1123, + Bytes: 10, Packets: 1, StartMonoTimeNs: 1123, EndMonoTimeNs: 1123, Flags: 1, }, } inputs <- &RawRecord{ RecordKey: k1, RecordMetrics: RecordMetrics{ - Bytes: 10, Packets: 1, StartMonoTimeNs: 1456, EndMonoTimeNs: 1456, + Bytes: 10, Packets: 1, StartMonoTimeNs: 1456, EndMonoTimeNs: 1456, Flags: 1, }, } @@ -174,6 +174,7 @@ func TestEvict_Period(t *testing.T) { Packets: 3, StartMonoTimeNs: 123, EndMonoTimeNs: 789, + Flags: 1, }, }, TimeFlowStart: now.Add(-1000 + 123), @@ -189,6 +190,7 @@ func TestEvict_Period(t *testing.T) { Packets: 2, StartMonoTimeNs: 1123, EndMonoTimeNs: 1456, + Flags: 1, }, }, TimeFlowStart: now.Add(-1000 + 1123), diff --git a/pkg/flow/deduper_test.go b/pkg/flow/deduper_test.go index 7f3176ffc3b5d913ae39dd8d2617aa20b6e1f953..ec359e7bdbfbcbf7fb115f57ce7af0fee18d0a47 100644 --- a/pkg/flow/deduper_test.go +++ b/pkg/flow/deduper_test.go @@ -13,26 +13,26 @@ var ( EthProtocol: 1, Direction: 1, Transport: Transport{SrcPort: 123, DstPort: 456}, DataLink: DataLink{DstMac: MacAddr{0x1}, SrcMac: MacAddr{0x1}}, IFIndex: 1, }, RecordMetrics: RecordMetrics{ - Packets: 2, Bytes: 456, + Packets: 2, Bytes: 456, Flags: 1, }}, Interface: "eth0"} oneIf2 = &Record{RawRecord: RawRecord{RecordKey: RecordKey{ EthProtocol: 1, Direction: 1, Transport: Transport{SrcPort: 123, DstPort: 456}, DataLink: DataLink{DstMac: MacAddr{0x2}, SrcMac: MacAddr{0x2}}, IFIndex: 2, }, RecordMetrics: RecordMetrics{ - Packets: 2, Bytes: 456, + Packets: 2, Bytes: 456, Flags: 1, }}, Interface: "123456789"} // another fow from 2 different interfaces and directions twoIf1 = &Record{RawRecord: RawRecord{RecordKey: RecordKey{ EthProtocol: 1, Direction: 1, Transport: Transport{SrcPort: 333, DstPort: 456}, DataLink: DataLink{DstMac: MacAddr{0x1}, SrcMac: MacAddr{0x1}}, IFIndex: 1, }, RecordMetrics: RecordMetrics{ - Packets: 2, Bytes: 456, + Packets: 2, Bytes: 456, Flags: 1, }}, Interface: "eth0"} twoIf2 = &Record{RawRecord: RawRecord{RecordKey: RecordKey{ EthProtocol: 1, Direction: 0, Transport: Transport{SrcPort: 333, DstPort: 456}, DataLink: DataLink{DstMac: MacAddr{0x2}, SrcMac: MacAddr{0x2}}, IFIndex: 2, }, RecordMetrics: RecordMetrics{ - Packets: 2, Bytes: 456, + Packets: 2, Bytes: 456, Flags: 1, }}, Interface: "123456789"} ) diff --git a/pkg/flow/record.go b/pkg/flow/record.go index 665f4ed25422f135ddd4f453d332441fc2162af8..4751c04cb6bbd1865c98a7351da7d348cb5f4c39 100644 --- a/pkg/flow/record.go +++ b/pkg/flow/record.go @@ -65,8 +65,8 @@ type RecordMetrics struct { // and monotime.Now() (user space) StartMonoTimeNs uint64 EndMonoTimeNs uint64 - - Errno uint8 + Flags uint16 + Errno uint8 } // record structure as parsed from eBPF @@ -124,6 +124,7 @@ func (r *RecordMetrics) Accumulate(src *RecordMetrics) { } r.Bytes += src.Bytes r.Packets += src.Packets + r.Flags |= src.Flags } // IP returns the net.IP equivalent object diff --git a/pkg/flow/record_test.go b/pkg/flow/record_test.go index e26618aab9e52bdd6790784c95925c325d3dc5dc..de782e7104f94f3296e0107a5d98a271fe8327f5 100644 --- a/pkg/flow/record_test.go +++ b/pkg/flow/record_test.go @@ -26,7 +26,9 @@ func TestRecordBinaryEncoding(t *testing.T) { 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, // u64 bytes 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, // u64 flow_start_time 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, // u64 flow_end_time + 0x13, 0x14, //flags 0x33, // u8 errno + })) require.NoError(t, err) @@ -54,6 +56,7 @@ func TestRecordBinaryEncoding(t *testing.T) { Bytes: 0x1a19181716151413, StartMonoTimeNs: 0x1a19181716151413, EndMonoTimeNs: 0x1a19181716151413, + Flags: 0x1413, Errno: 0x33, }, }, *fr) diff --git a/pkg/flow/tracer_map_test.go b/pkg/flow/tracer_map_test.go index 74c8c3d2a1636dfc150f9fc5d9203df0ccb146b2..0518159e66d9f5b1af2786092e049244ee614aa3 100644 --- a/pkg/flow/tracer_map_test.go +++ b/pkg/flow/tracer_map_test.go @@ -14,23 +14,23 @@ func TestPacketAggregation(t *testing.T) { } tcs := []testCase{{ input: []RecordMetrics{ - {Packets: 0, Bytes: 0, StartMonoTimeNs: 0, EndMonoTimeNs: 0}, - {Packets: 0x7, Bytes: 0x22d, StartMonoTimeNs: 0x176a790b240b, EndMonoTimeNs: 0x176a792a755b}, - {Packets: 0x0, Bytes: 0x0, StartMonoTimeNs: 0x0, EndMonoTimeNs: 0x0}, - {Packets: 0x0, Bytes: 0x0, StartMonoTimeNs: 0x0, EndMonoTimeNs: 0x0}, + {Packets: 0, Bytes: 0, StartMonoTimeNs: 0, EndMonoTimeNs: 0, Flags: 1}, + {Packets: 0x7, Bytes: 0x22d, StartMonoTimeNs: 0x176a790b240b, EndMonoTimeNs: 0x176a792a755b, Flags: 1}, + {Packets: 0x0, Bytes: 0x0, StartMonoTimeNs: 0x0, EndMonoTimeNs: 0x0, Flags: 1}, + {Packets: 0x0, Bytes: 0x0, StartMonoTimeNs: 0x0, EndMonoTimeNs: 0x0, Flags: 1}, }, expected: RecordMetrics{ - Packets: 0x7, Bytes: 0x22d, StartMonoTimeNs: 0x176a790b240b, EndMonoTimeNs: 0x176a792a755b, + Packets: 0x7, Bytes: 0x22d, StartMonoTimeNs: 0x176a790b240b, EndMonoTimeNs: 0x176a792a755b, Flags: 1, }, }, { input: []RecordMetrics{ - {Packets: 0x3, Bytes: 0x5c4, StartMonoTimeNs: 0x17f3e9613a7f, EndMonoTimeNs: 0x17f3e979816e}, - {Packets: 0x2, Bytes: 0x8c, StartMonoTimeNs: 0x17f3e9633a7f, EndMonoTimeNs: 0x17f3e96f164e}, - {Packets: 0x0, Bytes: 0x0, StartMonoTimeNs: 0x0, EndMonoTimeNs: 0x0}, - {Packets: 0x0, Bytes: 0x0, StartMonoTimeNs: 0x0, EndMonoTimeNs: 0x0}, + {Packets: 0x3, Bytes: 0x5c4, StartMonoTimeNs: 0x17f3e9613a7f, EndMonoTimeNs: 0x17f3e979816e, Flags: 1}, + {Packets: 0x2, Bytes: 0x8c, StartMonoTimeNs: 0x17f3e9633a7f, EndMonoTimeNs: 0x17f3e96f164e, Flags: 1}, + {Packets: 0x0, Bytes: 0x0, StartMonoTimeNs: 0x0, EndMonoTimeNs: 0x0, Flags: 1}, + {Packets: 0x0, Bytes: 0x0, StartMonoTimeNs: 0x0, EndMonoTimeNs: 0x0, Flags: 1}, }, expected: RecordMetrics{ - Packets: 0x5, Bytes: 0x5c4 + 0x8c, StartMonoTimeNs: 0x17f3e9613a7f, EndMonoTimeNs: 0x17f3e979816e, + Packets: 0x5, Bytes: 0x5c4 + 0x8c, StartMonoTimeNs: 0x17f3e9613a7f, EndMonoTimeNs: 0x17f3e979816e, Flags: 1, }, }} ft := MapTracer{} diff --git a/pkg/grpc/grpc_test.go b/pkg/grpc/grpc_test.go index eb2184ab209d24d886a30076352bc95ec97b6de1..fb858433535cb40e7bf6e7e6e12614b2c9cb320c 100644 --- a/pkg/grpc/grpc_test.go +++ b/pkg/grpc/grpc_test.go @@ -29,7 +29,7 @@ func TestGRPCCommunication(t *testing.T) { go func() { _, err = client.Send(context.Background(), &pbflow.Records{Entries: []*pbflow.Record{{ - EthProtocol: 123, Bytes: 456, Network: &pbflow.Network{ + EthProtocol: 123, Flags: 1, Bytes: 456, Network: &pbflow.Network{ SrcAddr: &pbflow.IP{ IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x11223344}, }, @@ -43,7 +43,7 @@ func TestGRPCCommunication(t *testing.T) { require.NoError(t, err) _, err = client.Send(context.Background(), &pbflow.Records{Entries: []*pbflow.Record{{ - EthProtocol: 789, Bytes: 101, Network: &pbflow.Network{ + EthProtocol: 789, Flags: 1, Bytes: 101, Network: &pbflow.Network{ SrcAddr: &pbflow.IP{ IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x44332211}, }, @@ -66,6 +66,7 @@ func TestGRPCCommunication(t *testing.T) { assert.Len(t, rs.Entries, 1) r := rs.Entries[0] assert.EqualValues(t, 123, r.EthProtocol) + assert.EqualValues(t, 1, r.Flags) assert.EqualValues(t, 456, r.Bytes) assert.EqualValues(t, 0x11223344, r.GetNetwork().GetSrcAddr().GetIpv4()) assert.EqualValues(t, 0x55667788, r.GetNetwork().GetDstAddr().GetIpv4()) @@ -78,6 +79,7 @@ func TestGRPCCommunication(t *testing.T) { assert.Len(t, rs.Entries, 1) r = rs.Entries[0] assert.EqualValues(t, 789, r.EthProtocol) + assert.EqualValues(t, 1, r.Flags) assert.EqualValues(t, 101, r.Bytes) assert.EqualValues(t, 0x44332211, r.GetNetwork().GetSrcAddr().GetIpv4()) assert.EqualValues(t, uint64(0x88776655), r.GetNetwork().GetDstAddr().GetIpv4()) @@ -114,7 +116,7 @@ func TestConstructorOptions(t *testing.T) { go func() { _, err = client.Send(context.Background(), - &pbflow.Records{Entries: []*pbflow.Record{{EthProtocol: 123, Bytes: 456}}}) + &pbflow.Records{Entries: []*pbflow.Record{{EthProtocol: 123, Bytes: 456, Flags: 1}}}) require.NoError(t, err) }() @@ -140,6 +142,7 @@ func BenchmarkGRPCCommunication(b *testing.B) { f := &pbflow.Record{ EthProtocol: 2048, Bytes: 456, + Flags: 1, Direction: pbflow.Direction_EGRESS, TimeFlowStart: timestamppb.Now(), TimeFlowEnd: timestamppb.Now(), diff --git a/pkg/pbflow/flow.pb.go b/pkg/pbflow/flow.pb.go index 4458513eb51f3c03dc69d209605bbdcc6237ab04..6306ac87ef7dbb8025ad3665f7a2405aeaf91ce9 100644 --- a/pkg/pbflow/flow.pb.go +++ b/pkg/pbflow/flow.pb.go @@ -1,15 +1,15 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protoc v3.19.4 +// protoc-gen-go v1.27.1 +// protoc v3.12.4 // source: proto/flow.proto package pbflow import ( + timestamp "github.com/golang/protobuf/ptypes/timestamp" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" ) @@ -162,10 +162,10 @@ type Record struct { // protocol as defined by ETH_P_* in linux/if_ether.h // https://github.com/torvalds/linux/blob/master/include/uapi/linux/if_ether.h - EthProtocol uint32 `protobuf:"varint,1,opt,name=eth_protocol,json=ethProtocol,proto3" json:"eth_protocol,omitempty"` - Direction Direction `protobuf:"varint,2,opt,name=direction,proto3,enum=pbflow.Direction" json:"direction,omitempty"` - TimeFlowStart *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=time_flow_start,json=timeFlowStart,proto3" json:"time_flow_start,omitempty"` - TimeFlowEnd *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=time_flow_end,json=timeFlowEnd,proto3" json:"time_flow_end,omitempty"` + EthProtocol uint32 `protobuf:"varint,1,opt,name=eth_protocol,json=ethProtocol,proto3" json:"eth_protocol,omitempty"` + Direction Direction `protobuf:"varint,2,opt,name=direction,proto3,enum=pbflow.Direction" json:"direction,omitempty"` + TimeFlowStart *timestamp.Timestamp `protobuf:"bytes,3,opt,name=time_flow_start,json=timeFlowStart,proto3" json:"time_flow_start,omitempty"` + TimeFlowEnd *timestamp.Timestamp `protobuf:"bytes,4,opt,name=time_flow_end,json=timeFlowEnd,proto3" json:"time_flow_end,omitempty"` // OSI-layer attributes DataLink *DataLink `protobuf:"bytes,5,opt,name=data_link,json=dataLink,proto3" json:"data_link,omitempty"` Network *Network `protobuf:"bytes,6,opt,name=network,proto3" json:"network,omitempty"` @@ -177,7 +177,8 @@ type Record struct { // From all the duplicate flows, one will set this value to false and the rest will be true. Duplicate bool `protobuf:"varint,11,opt,name=duplicate,proto3" json:"duplicate,omitempty"` // Agent IP address to help identifying the source of the flow - AgentIp *IP `protobuf:"bytes,12,opt,name=agent_ip,json=agentIp,proto3" json:"agent_ip,omitempty"` + AgentIp *IP `protobuf:"bytes,12,opt,name=agent_ip,json=agentIp,proto3" json:"agent_ip,omitempty"` + Flags uint32 `protobuf:"varint,13,opt,name=flags,proto3" json:"flags,omitempty"` } func (x *Record) Reset() { @@ -226,14 +227,14 @@ func (x *Record) GetDirection() Direction { return Direction_INGRESS } -func (x *Record) GetTimeFlowStart() *timestamppb.Timestamp { +func (x *Record) GetTimeFlowStart() *timestamp.Timestamp { if x != nil { return x.TimeFlowStart } return nil } -func (x *Record) GetTimeFlowEnd() *timestamppb.Timestamp { +func (x *Record) GetTimeFlowEnd() *timestamp.Timestamp { if x != nil { return x.TimeFlowEnd } @@ -296,6 +297,13 @@ func (x *Record) GetAgentIp() *IP { return nil } +func (x *Record) GetFlags() uint32 { + if x != nil { + return x.Flags + } + return 0 +} + type DataLink struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -562,7 +570,7 @@ var file_proto_flow_proto_rawDesc = []byte{ 0x07, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x12, 0x28, 0x0a, 0x07, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x07, 0x65, 0x6e, 0x74, 0x72, 0x69, - 0x65, 0x73, 0x22, 0xfe, 0x03, 0x0a, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x21, 0x0a, + 0x65, 0x73, 0x22, 0x94, 0x04, 0x0a, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x65, 0x74, 0x68, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0b, 0x65, 0x74, 0x68, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x12, 0x2f, 0x0a, 0x09, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, @@ -594,34 +602,35 @@ var file_proto_flow_proto_rawDesc = []byte{ 0x08, 0x52, 0x09, 0x64, 0x75, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x08, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x70, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, 0x50, 0x52, 0x07, 0x61, 0x67, 0x65, 0x6e, - 0x74, 0x49, 0x70, 0x22, 0x3c, 0x0a, 0x08, 0x44, 0x61, 0x74, 0x61, 0x4c, 0x69, 0x6e, 0x6b, 0x12, - 0x17, 0x0a, 0x07, 0x73, 0x72, 0x63, 0x5f, 0x6d, 0x61, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, - 0x52, 0x06, 0x73, 0x72, 0x63, 0x4d, 0x61, 0x63, 0x12, 0x17, 0x0a, 0x07, 0x64, 0x73, 0x74, 0x5f, - 0x6d, 0x61, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x64, 0x73, 0x74, 0x4d, 0x61, - 0x63, 0x22, 0x57, 0x0a, 0x07, 0x4e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x12, 0x25, 0x0a, 0x08, - 0x73, 0x72, 0x63, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, - 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, 0x50, 0x52, 0x07, 0x73, 0x72, 0x63, 0x41, - 0x64, 0x64, 0x72, 0x12, 0x25, 0x0a, 0x08, 0x64, 0x73, 0x74, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, - 0x50, 0x52, 0x07, 0x64, 0x73, 0x74, 0x41, 0x64, 0x64, 0x72, 0x22, 0x3d, 0x0a, 0x02, 0x49, 0x50, - 0x12, 0x14, 0x0a, 0x04, 0x69, 0x70, 0x76, 0x34, 0x18, 0x01, 0x20, 0x01, 0x28, 0x07, 0x48, 0x00, - 0x52, 0x04, 0x69, 0x70, 0x76, 0x34, 0x12, 0x14, 0x0a, 0x04, 0x69, 0x70, 0x76, 0x36, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x04, 0x69, 0x70, 0x76, 0x36, 0x42, 0x0b, 0x0a, 0x09, - 0x69, 0x70, 0x5f, 0x66, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x22, 0x5d, 0x0a, 0x09, 0x54, 0x72, 0x61, - 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x73, 0x72, 0x63, 0x5f, 0x70, 0x6f, - 0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x73, 0x72, 0x63, 0x50, 0x6f, 0x72, - 0x74, 0x12, 0x19, 0x0a, 0x08, 0x64, 0x73, 0x74, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0d, 0x52, 0x07, 0x64, 0x73, 0x74, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x1a, 0x0a, 0x08, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2a, 0x24, 0x0a, 0x09, 0x44, 0x69, 0x72, 0x65, - 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0b, 0x0a, 0x07, 0x49, 0x4e, 0x47, 0x52, 0x45, 0x53, 0x53, - 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x45, 0x47, 0x52, 0x45, 0x53, 0x53, 0x10, 0x01, 0x32, 0x3e, - 0x0a, 0x09, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x12, 0x31, 0x0a, 0x04, 0x53, - 0x65, 0x6e, 0x64, 0x12, 0x0f, 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x52, 0x65, 0x63, - 0x6f, 0x72, 0x64, 0x73, 0x1a, 0x16, 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x43, 0x6f, - 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x0a, - 0x5a, 0x08, 0x2e, 0x2f, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x74, 0x49, 0x70, 0x12, 0x14, 0x0a, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x18, 0x0d, 0x20, 0x01, + 0x28, 0x0d, 0x52, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x22, 0x3c, 0x0a, 0x08, 0x44, 0x61, 0x74, + 0x61, 0x4c, 0x69, 0x6e, 0x6b, 0x12, 0x17, 0x0a, 0x07, 0x73, 0x72, 0x63, 0x5f, 0x6d, 0x61, 0x63, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x73, 0x72, 0x63, 0x4d, 0x61, 0x63, 0x12, 0x17, + 0x0a, 0x07, 0x64, 0x73, 0x74, 0x5f, 0x6d, 0x61, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, + 0x06, 0x64, 0x73, 0x74, 0x4d, 0x61, 0x63, 0x22, 0x57, 0x0a, 0x07, 0x4e, 0x65, 0x74, 0x77, 0x6f, + 0x72, 0x6b, 0x12, 0x25, 0x0a, 0x08, 0x73, 0x72, 0x63, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, 0x50, + 0x52, 0x07, 0x73, 0x72, 0x63, 0x41, 0x64, 0x64, 0x72, 0x12, 0x25, 0x0a, 0x08, 0x64, 0x73, 0x74, + 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x70, 0x62, + 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, 0x50, 0x52, 0x07, 0x64, 0x73, 0x74, 0x41, 0x64, 0x64, 0x72, + 0x22, 0x3d, 0x0a, 0x02, 0x49, 0x50, 0x12, 0x14, 0x0a, 0x04, 0x69, 0x70, 0x76, 0x34, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x07, 0x48, 0x00, 0x52, 0x04, 0x69, 0x70, 0x76, 0x34, 0x12, 0x14, 0x0a, 0x04, + 0x69, 0x70, 0x76, 0x36, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x04, 0x69, 0x70, + 0x76, 0x36, 0x42, 0x0b, 0x0a, 0x09, 0x69, 0x70, 0x5f, 0x66, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x22, + 0x5d, 0x0a, 0x09, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x19, 0x0a, 0x08, + 0x73, 0x72, 0x63, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, + 0x73, 0x72, 0x63, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x64, 0x73, 0x74, 0x5f, 0x70, + 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x64, 0x73, 0x74, 0x50, 0x6f, + 0x72, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2a, 0x24, + 0x0a, 0x09, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0b, 0x0a, 0x07, 0x49, + 0x4e, 0x47, 0x52, 0x45, 0x53, 0x53, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x45, 0x47, 0x52, 0x45, + 0x53, 0x53, 0x10, 0x01, 0x32, 0x3e, 0x0a, 0x09, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, + 0x72, 0x12, 0x31, 0x0a, 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x0f, 0x2e, 0x70, 0x62, 0x66, 0x6c, + 0x6f, 0x77, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x1a, 0x16, 0x2e, 0x70, 0x62, 0x66, + 0x6c, 0x6f, 0x77, 0x2e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x70, + 0x6c, 0x79, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -639,15 +648,15 @@ func file_proto_flow_proto_rawDescGZIP() []byte { var file_proto_flow_proto_enumTypes = make([]protoimpl.EnumInfo, 1) var file_proto_flow_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_proto_flow_proto_goTypes = []interface{}{ - (Direction)(0), // 0: pbflow.Direction - (*CollectorReply)(nil), // 1: pbflow.CollectorReply - (*Records)(nil), // 2: pbflow.Records - (*Record)(nil), // 3: pbflow.Record - (*DataLink)(nil), // 4: pbflow.DataLink - (*Network)(nil), // 5: pbflow.Network - (*IP)(nil), // 6: pbflow.IP - (*Transport)(nil), // 7: pbflow.Transport - (*timestamppb.Timestamp)(nil), // 8: google.protobuf.Timestamp + (Direction)(0), // 0: pbflow.Direction + (*CollectorReply)(nil), // 1: pbflow.CollectorReply + (*Records)(nil), // 2: pbflow.Records + (*Record)(nil), // 3: pbflow.Record + (*DataLink)(nil), // 4: pbflow.DataLink + (*Network)(nil), // 5: pbflow.Network + (*IP)(nil), // 6: pbflow.IP + (*Transport)(nil), // 7: pbflow.Transport + (*timestamp.Timestamp)(nil), // 8: google.protobuf.Timestamp } var file_proto_flow_proto_depIdxs = []int32{ 3, // 0: pbflow.Records.entries:type_name -> pbflow.Record diff --git a/pkg/pbflow/flow_grpc.pb.go b/pkg/pbflow/flow_grpc.pb.go index 11aee3668bcafaf805f490d620f92efd5ac424ca..9b91b5efd631eb191d2c8310644050b31b75d2bb 100644 --- a/pkg/pbflow/flow_grpc.pb.go +++ b/pkg/pbflow/flow_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v3.19.4 +// - protoc v3.12.4 // source: proto/flow.proto package pbflow diff --git a/proto/flow.proto b/proto/flow.proto index 8b5994eec91743fe5f9378377a576e9aab12271f..9e291f7292d0ed72ba6bbcd528355e752f1d9bc6 100644 --- a/proto/flow.proto +++ b/proto/flow.proto @@ -40,6 +40,7 @@ message Record { // Agent IP address to help identifying the source of the flow IP agent_ip = 12; + uint32 flags = 13; } message DataLink {