diff --git a/bpf/dns_tracker.h b/bpf/dns_tracker.h index f3aa795f37f65d1d68b7fee4b92b6e32aff738b2..f7918ea10c282d9995894ee2f281351cf9880102 100644 --- a/bpf/dns_tracker.h +++ b/bpf/dns_tracker.h @@ -19,13 +19,13 @@ struct dns_header { u16 arcount; }; -static inline void find_or_create_dns_flow(flow_id *id, struct dns_header *dns, int len, int dir, u16 flags) { +static inline void find_or_create_dns_flow(flow_id *id, struct dns_header *dns, int len, u16 flags, u64 latency) { flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, id); u64 current_time = bpf_ktime_get_ns(); // net_dev_queue trace point hook will run before TC hooks, so the flow shouldn't exists, if it does // that indicates we have a stale DNS query/response or in the middle of TCP flow so we will do nothing if (aggregate_flow == NULL) { - // there is no matching flows so lets create new one and add the drops + // there is no matching flows so lets create new one and dns info flow_metrics new_flow; __builtin_memset(&new_flow, 0, sizeof(new_flow)); new_flow.start_mono_time_ts = current_time; @@ -35,15 +35,28 @@ static inline void find_or_create_dns_flow(flow_id *id, struct dns_header *dns, new_flow.flags = flags; new_flow.dns_record.id = bpf_ntohs(dns->id); new_flow.dns_record.flags = bpf_ntohs(dns->flags); - if (dir == EGRESS) { - new_flow.dns_record.req_mono_time_ts = current_time; - } else { - new_flow.dns_record.rsp_mono_time_ts = current_time; - } + new_flow.dns_record.latency = latency; bpf_map_update_elem(&aggregated_flows, id, &new_flow, BPF_ANY); } } +static inline void fill_dns_id (flow_id *id, dns_flow_id *dns_flow, u16 dns_id, bool reverse) { + dns_flow->id = dns_id; + dns_flow->if_index = id->if_index; + dns_flow->protocol = id->transport_protocol; + if (reverse) { + __builtin_memcpy(dns_flow->src_ip, id->dst_ip, IP_MAX_LEN); + __builtin_memcpy(dns_flow->dst_ip, id->src_ip, IP_MAX_LEN); + dns_flow->src_port = id->dst_port; + dns_flow->dst_port = id->src_port; + } else { + __builtin_memcpy(dns_flow->src_ip, id->src_ip, IP_MAX_LEN); + __builtin_memcpy(dns_flow->dst_ip, id->dst_ip, IP_MAX_LEN); + dns_flow->src_port = id->src_port; + dns_flow->dst_port = id->dst_port; + } +} + static inline int trace_dns(struct sk_buff *skb) { flow_id id; u8 protocol = 0; @@ -79,13 +92,26 @@ static inline int trace_dns(struct sk_buff *skb) { // check for DNS packets if (id.dst_port == DNS_PORT || id.src_port == DNS_PORT) { struct dns_header dns; + dns_flow_id dns_req; + u64 latency = 0; bpf_probe_read(&dns, sizeof(dns), (struct dns_header *)(skb->head + skb->transport_header + len)); if ((bpf_ntohs(dns.flags) & DNS_QR_FLAG) == 0) { /* dns query */ + fill_dns_id(&id, &dns_req, bpf_ntohs(dns.id), false); + if (bpf_map_lookup_elem(&dns_flows, &dns_req) == NULL) { + u64 ts = bpf_ktime_get_ns(); + bpf_map_update_elem(&dns_flows, &dns_req, &ts, BPF_ANY); + } id.direction = EGRESS; } else { /* dns response */ id.direction = INGRESS; + fill_dns_id(&id, &dns_req, bpf_ntohs(dns.id), true); + u64 *value = bpf_map_lookup_elem(&dns_flows, &dns_req); + if (value != NULL) { + latency = bpf_ktime_get_ns() - *value; + bpf_map_delete_elem(&dns_flows, &dns_req); + find_or_create_dns_flow(&id, &dns, skb->len, flags, latency); + } } // end of dns response - find_or_create_dns_flow(&id, &dns, skb->len, id.direction, flags); } // end of dns port check return 0; diff --git a/bpf/maps_definition.h b/bpf/maps_definition.h index 4c5bf86e718587adc2633c371e3bde27586adcd5..a7dca2a30f872197c6a56024b536a22b1ee01bb7 100644 --- a/bpf/maps_definition.h +++ b/bpf/maps_definition.h @@ -31,4 +31,12 @@ struct { __type(value, u64); } flow_sequences SEC(".maps"); +// DNS tracking flow based hashmap used to correlate query and responses +// to allow calculating latency in ebpf agent directly +struct { + __uint(type, BPF_MAP_TYPE_LRU_HASH); + __uint(max_entries, 1 << 20); // Will take around 64MB of space. + __type(key, dns_flow_id); + __type(value, u64); +} dns_flows SEC(".maps"); #endif //__MAPS_DEFINITION_H__ diff --git a/bpf/types.h b/bpf/types.h index 0ce9037590718be2739d70478984ea1647c2ee8c..be511ce964d765700b983bc5db43e7ce6bb9733b 100644 --- a/bpf/types.h +++ b/bpf/types.h @@ -87,8 +87,7 @@ typedef struct flow_metrics_t { struct dns_record_t { u16 id; u16 flags; - u64 req_mono_time_ts; - u64 rsp_mono_time_ts; + u64 latency; } __attribute__((packed)) dns_record; u64 flow_rtt; } __attribute__((packed)) flow_metrics; @@ -158,5 +157,16 @@ typedef struct pkt_info_t { u64 rtt; // rtt calculated from the flow if possible. else zero } pkt_info; +// DNS Flow record used as key to correlate DNS query and response +typedef struct dns_flow_id_t { + u16 src_port; + u16 dst_port; + u8 src_ip[IP_MAX_LEN]; + u8 dst_ip[IP_MAX_LEN]; + u16 id; + u32 if_index; + u8 protocol; +} __attribute__((packed)) dns_flow_id; + #endif /* __TYPES_H__ */ diff --git a/examples/flowlogs-dump/server/flowlogs-dump-collector.go b/examples/flowlogs-dump/server/flowlogs-dump-collector.go index b083a96ee300405b804abd30edc4584b290b7e76..c8c6f2f66bd3f14448d31e271672d4a96fb65ae1 100644 --- a/examples/flowlogs-dump/server/flowlogs-dump-collector.go +++ b/examples/flowlogs-dump/server/flowlogs-dump-collector.go @@ -72,7 +72,7 @@ func main() { for records := range receivedRecords { for _, record := range records.Entries { if record.EthProtocol == ipv6 { - log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsReq: %v dnsRsp: %v rtt %v\n", + log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt %v\n", ipProto[record.EthProtocol], record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"), record.Interface, @@ -90,12 +90,11 @@ func main() { record.TimeFlowEnd.AsTime().Local().Format("15:04:05.000000"), record.GetDnsId(), record.GetDnsFlags(), - record.GetTimeDnsReq(), - record.GetTimeDnsRsp(), + record.DnsLatency.AsDuration().Milliseconds(), record.TimeFlowRtt.AsDuration().Microseconds(), ) } else { - log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsReq: %v dnsRsp: %v rtt %v\n", + log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt %v\n", ipProto[record.EthProtocol], record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"), record.Interface, @@ -113,8 +112,7 @@ func main() { record.TimeFlowEnd.AsTime().Local().Format("15:04:05.000000"), record.GetDnsId(), record.GetDnsFlags(), - record.GetTimeDnsReq(), - record.GetTimeDnsRsp(), + record.DnsLatency.AsDuration().Milliseconds(), record.TimeFlowRtt.AsDuration().Microseconds(), ) } diff --git a/pkg/ebpf/bpf_bpfeb.go b/pkg/ebpf/bpf_bpfeb.go index 3171606c1f2ce6b5a39a90f6bd5ba5bb4094551d..146c17c804f7e59880d382757c589c95dbfa116b 100644 --- a/pkg/ebpf/bpf_bpfeb.go +++ b/pkg/ebpf/bpf_bpfeb.go @@ -13,11 +13,20 @@ import ( "github.com/cilium/ebpf" ) +type BpfDnsFlowId struct { + SrcPort uint16 + DstPort uint16 + SrcIp [16]uint8 + DstIp [16]uint8 + Id uint16 + IfIndex uint32 + Protocol uint8 +} + type BpfDnsRecordT struct { - Id uint16 - Flags uint16 - ReqMonoTimeTs uint64 - RspMonoTimeTs uint64 + Id uint16 + Flags uint16 + Latency uint64 } type BpfFlowId BpfFlowIdT @@ -125,6 +134,7 @@ type BpfProgramSpecs struct { type BpfMapSpecs struct { AggregatedFlows *ebpf.MapSpec `ebpf:"aggregated_flows"` DirectFlows *ebpf.MapSpec `ebpf:"direct_flows"` + DnsFlows *ebpf.MapSpec `ebpf:"dns_flows"` FlowSequences *ebpf.MapSpec `ebpf:"flow_sequences"` } @@ -149,6 +159,7 @@ func (o *BpfObjects) Close() error { type BpfMaps struct { AggregatedFlows *ebpf.Map `ebpf:"aggregated_flows"` DirectFlows *ebpf.Map `ebpf:"direct_flows"` + DnsFlows *ebpf.Map `ebpf:"dns_flows"` FlowSequences *ebpf.Map `ebpf:"flow_sequences"` } @@ -156,6 +167,7 @@ func (m *BpfMaps) Close() error { return _BpfClose( m.AggregatedFlows, m.DirectFlows, + m.DnsFlows, m.FlowSequences, ) } @@ -189,5 +201,6 @@ func _BpfClose(closers ...io.Closer) error { } // Do not access this directly. +// //go:embed bpf_bpfeb.o var _BpfBytes []byte diff --git a/pkg/ebpf/bpf_bpfeb.o b/pkg/ebpf/bpf_bpfeb.o index b606e40943a81445fb93153d1c64678ddf091639..f80979eef561c5afee76e004150cdf56aa4f4db6 100644 Binary files a/pkg/ebpf/bpf_bpfeb.o and b/pkg/ebpf/bpf_bpfeb.o differ diff --git a/pkg/ebpf/bpf_bpfel.go b/pkg/ebpf/bpf_bpfel.go index 32e900b5beba46c86420e33a7d0cadadc4887b13..82a8755be16c25fac73516ddbc5c151dd94edbc6 100644 --- a/pkg/ebpf/bpf_bpfel.go +++ b/pkg/ebpf/bpf_bpfel.go @@ -13,11 +13,20 @@ import ( "github.com/cilium/ebpf" ) +type BpfDnsFlowId struct { + SrcPort uint16 + DstPort uint16 + SrcIp [16]uint8 + DstIp [16]uint8 + Id uint16 + IfIndex uint32 + Protocol uint8 +} + type BpfDnsRecordT struct { - Id uint16 - Flags uint16 - ReqMonoTimeTs uint64 - RspMonoTimeTs uint64 + Id uint16 + Flags uint16 + Latency uint64 } type BpfFlowId BpfFlowIdT @@ -125,6 +134,7 @@ type BpfProgramSpecs struct { type BpfMapSpecs struct { AggregatedFlows *ebpf.MapSpec `ebpf:"aggregated_flows"` DirectFlows *ebpf.MapSpec `ebpf:"direct_flows"` + DnsFlows *ebpf.MapSpec `ebpf:"dns_flows"` FlowSequences *ebpf.MapSpec `ebpf:"flow_sequences"` } @@ -149,6 +159,7 @@ func (o *BpfObjects) Close() error { type BpfMaps struct { AggregatedFlows *ebpf.Map `ebpf:"aggregated_flows"` DirectFlows *ebpf.Map `ebpf:"direct_flows"` + DnsFlows *ebpf.Map `ebpf:"dns_flows"` FlowSequences *ebpf.Map `ebpf:"flow_sequences"` } @@ -156,6 +167,7 @@ func (m *BpfMaps) Close() error { return _BpfClose( m.AggregatedFlows, m.DirectFlows, + m.DnsFlows, m.FlowSequences, ) } @@ -189,5 +201,6 @@ func _BpfClose(closers ...io.Closer) error { } // Do not access this directly. +// //go:embed bpf_bpfel.o var _BpfBytes []byte diff --git a/pkg/ebpf/bpf_bpfel.o b/pkg/ebpf/bpf_bpfel.o index 3e563be5bf9bf32c326400fadfa82b30382904ef..5db3687c2899c17c044af1dc776c3652c2b4000c 100644 Binary files a/pkg/ebpf/bpf_bpfel.o and b/pkg/ebpf/bpf_bpfel.o differ diff --git a/pkg/exporter/proto.go b/pkg/exporter/proto.go index 1fa61db04a02d7f82026ba50c305ae745a14e17f..2a574adba1cc0312f76d104565d74ee6c13c1b36 100644 --- a/pkg/exporter/proto.go +++ b/pkg/exporter/proto.go @@ -80,17 +80,8 @@ func v4FlowToPB(fr *flow.Record) *pbflow.Record { DnsFlags: uint32(fr.Metrics.DnsRecord.Flags), TimeFlowRtt: durationpb.New(fr.TimeFlowRtt), } - if fr.Metrics.DnsRecord.ReqMonoTimeTs != 0 { - pbflowRecord.TimeDnsReq = ×tamppb.Timestamp{ - Seconds: fr.TimeDNSRequest.Unix(), - Nanos: int32(fr.TimeDNSRequest.Nanosecond()), - } - } - if fr.Metrics.DnsRecord.RspMonoTimeTs != 0 { - pbflowRecord.TimeDnsRsp = ×tamppb.Timestamp{ - Seconds: fr.TimeDNSResponse.Unix(), - Nanos: int32(fr.TimeDNSResponse.Nanosecond()), - } + if fr.Metrics.DnsRecord.Latency != 0 { + pbflowRecord.DnsLatency = durationpb.New(fr.DNSLatency) } return &pbflowRecord } @@ -137,17 +128,8 @@ func v6FlowToPB(fr *flow.Record) *pbflow.Record { DnsFlags: uint32(fr.Metrics.DnsRecord.Flags), TimeFlowRtt: durationpb.New(fr.TimeFlowRtt), } - if fr.Metrics.DnsRecord.ReqMonoTimeTs != 0 { - pbflowRecord.TimeDnsReq = ×tamppb.Timestamp{ - Seconds: fr.TimeDNSRequest.Unix(), - Nanos: int32(fr.TimeDNSRequest.Nanosecond()), - } - } - if fr.Metrics.DnsRecord.RspMonoTimeTs != 0 { - pbflowRecord.TimeDnsRsp = ×tamppb.Timestamp{ - Seconds: fr.TimeDNSResponse.Unix(), - Nanos: int32(fr.TimeDNSResponse.Nanosecond()), - } + if fr.Metrics.DnsRecord.Latency != 0 { + pbflowRecord.DnsLatency = durationpb.New(fr.DNSLatency) } return &pbflowRecord } diff --git a/pkg/flow/account_test.go b/pkg/flow/account_test.go index 211348fa78ad7ec8646e9fa47ed3a211b5b45b4f..c53df299c17401c296ccc2895a48913abd8b1206 100644 --- a/pkg/flow/account_test.go +++ b/pkg/flow/account_test.go @@ -64,30 +64,18 @@ func TestEvict_MaxEntries(t *testing.T) { Id: k1, Metrics: ebpf.BpfFlowMetrics{ Bytes: 123, Packets: 1, StartMonoTimeTs: 123, EndMonoTimeTs: 123, Flags: 1, - DnsRecord: ebpf.BpfDnsRecordT{ - ReqMonoTimeTs: 123, - RspMonoTimeTs: 0, - }, }, } inputs <- &RawRecord{ Id: k2, Metrics: ebpf.BpfFlowMetrics{ Bytes: 456, Packets: 1, StartMonoTimeTs: 456, EndMonoTimeTs: 456, Flags: 1, - DnsRecord: ebpf.BpfDnsRecordT{ - ReqMonoTimeTs: 456, - RspMonoTimeTs: 0, - }, }, } inputs <- &RawRecord{ Id: k1, Metrics: ebpf.BpfFlowMetrics{ Bytes: 321, Packets: 1, StartMonoTimeTs: 789, EndMonoTimeTs: 789, Flags: 1, - DnsRecord: ebpf.BpfDnsRecordT{ - ReqMonoTimeTs: 789, - RspMonoTimeTs: 789, - }, }, } requireNoEviction(t, evictor) @@ -97,10 +85,6 @@ func TestEvict_MaxEntries(t *testing.T) { Id: k3, Metrics: ebpf.BpfFlowMetrics{ Bytes: 111, Packets: 1, StartMonoTimeTs: 888, EndMonoTimeTs: 888, Flags: 1, - DnsRecord: ebpf.BpfDnsRecordT{ - ReqMonoTimeTs: 888, - RspMonoTimeTs: 888, - }, }, } @@ -121,30 +105,20 @@ func TestEvict_MaxEntries(t *testing.T) { Id: k1, Metrics: ebpf.BpfFlowMetrics{ Bytes: 123, Packets: 1, StartMonoTimeTs: 123, EndMonoTimeTs: 123, Flags: 1, - DnsRecord: ebpf.BpfDnsRecordT{ - ReqMonoTimeTs: 123, - RspMonoTimeTs: 0, - }, }, }, - TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond), - TimeFlowEnd: now.Add(-(1000 - 123) * time.Nanosecond), - TimeDNSRequest: now.Add(-(1000 - 123) * time.Nanosecond), + TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond), + TimeFlowEnd: now.Add(-(1000 - 123) * time.Nanosecond), }, k2: { RawRecord: RawRecord{ Id: k2, Metrics: ebpf.BpfFlowMetrics{ Bytes: 456, Packets: 1, StartMonoTimeTs: 456, EndMonoTimeTs: 456, Flags: 1, - DnsRecord: ebpf.BpfDnsRecordT{ - ReqMonoTimeTs: 456, - RspMonoTimeTs: 0, - }, }, }, - TimeFlowStart: now.Add(-(1000 - 456) * time.Nanosecond), - TimeFlowEnd: now.Add(-(1000 - 456) * time.Nanosecond), - TimeDNSRequest: now.Add(-(1000 - 456) * time.Nanosecond), + TimeFlowStart: now.Add(-(1000 - 456) * time.Nanosecond), + TimeFlowEnd: now.Add(-(1000 - 456) * time.Nanosecond), }, }, received) } @@ -167,30 +141,18 @@ func TestEvict_Period(t *testing.T) { Id: k1, Metrics: ebpf.BpfFlowMetrics{ Bytes: 10, Packets: 1, StartMonoTimeTs: 123, EndMonoTimeTs: 123, Flags: 1, - DnsRecord: ebpf.BpfDnsRecordT{ - ReqMonoTimeTs: 123, - RspMonoTimeTs: 0, - }, }, } inputs <- &RawRecord{ Id: k1, Metrics: ebpf.BpfFlowMetrics{ Bytes: 10, Packets: 1, StartMonoTimeTs: 456, EndMonoTimeTs: 456, Flags: 1, - DnsRecord: ebpf.BpfDnsRecordT{ - ReqMonoTimeTs: 456, - RspMonoTimeTs: 0, - }, }, } inputs <- &RawRecord{ Id: k1, Metrics: ebpf.BpfFlowMetrics{ Bytes: 10, Packets: 1, StartMonoTimeTs: 789, EndMonoTimeTs: 789, Flags: 1, - DnsRecord: ebpf.BpfDnsRecordT{ - ReqMonoTimeTs: 789, - RspMonoTimeTs: 0, - }, }, } // Forcing at least one eviction here @@ -199,20 +161,12 @@ func TestEvict_Period(t *testing.T) { Id: k1, Metrics: ebpf.BpfFlowMetrics{ Bytes: 10, Packets: 1, StartMonoTimeTs: 1123, EndMonoTimeTs: 1123, Flags: 1, - DnsRecord: ebpf.BpfDnsRecordT{ - ReqMonoTimeTs: 1123, - RspMonoTimeTs: 0, - }, }, } inputs <- &RawRecord{ Id: k1, Metrics: ebpf.BpfFlowMetrics{ Bytes: 10, Packets: 1, StartMonoTimeTs: 1456, EndMonoTimeTs: 1456, Flags: 1, - DnsRecord: ebpf.BpfDnsRecordT{ - ReqMonoTimeTs: 1456, - RspMonoTimeTs: 0, - }, }, } @@ -229,15 +183,10 @@ func TestEvict_Period(t *testing.T) { StartMonoTimeTs: 123, EndMonoTimeTs: 123, Flags: 1, - DnsRecord: ebpf.BpfDnsRecordT{ - ReqMonoTimeTs: 123, - RspMonoTimeTs: 0, - }, }, }, - TimeFlowStart: now.Add(-1000 + 123), - TimeFlowEnd: now.Add(-1000 + 123), - TimeDNSRequest: now.Add(-1000 + 123), + TimeFlowStart: now.Add(-1000 + 123), + TimeFlowEnd: now.Add(-1000 + 123), }, *records[0]) records = receiveTimeout(t, evictor) require.Len(t, records, 1) @@ -250,15 +199,10 @@ func TestEvict_Period(t *testing.T) { StartMonoTimeTs: 1123, EndMonoTimeTs: 1123, Flags: 1, - DnsRecord: ebpf.BpfDnsRecordT{ - ReqMonoTimeTs: 1123, - RspMonoTimeTs: 0, - }, }, }, - TimeFlowStart: now.Add(-1000 + 1123), - TimeFlowEnd: now.Add(-1000 + 1123), - TimeDNSRequest: now.Add(-1000 + 1123), + TimeFlowStart: now.Add(-1000 + 1123), + TimeFlowEnd: now.Add(-1000 + 1123), }, *records[0]) // no more flows are evicted diff --git a/pkg/flow/record.go b/pkg/flow/record.go index ef9ea23f9c46ba09bc7581a27e8e28005d09bbdb..f3e173da102e354a4da2cf798e02b7e10d59ff93 100644 --- a/pkg/flow/record.go +++ b/pkg/flow/record.go @@ -37,11 +37,10 @@ type RawRecord ebpf.BpfFlowRecordT type Record struct { RawRecord // TODO: redundant field from RecordMetrics. Reorganize structs - TimeFlowStart time.Time - TimeFlowEnd time.Time - TimeDNSRequest time.Time - TimeDNSResponse time.Time - Interface string + TimeFlowStart time.Time + TimeFlowEnd time.Time + DNSLatency time.Duration + Interface string // Duplicate tells whether this flow has another duplicate so it has to be excluded from // any metrics' aggregation (e.g. bytes/second rates between two pods). // The reason for this field is that the same flow can be observed from multiple interfaces, @@ -74,16 +73,10 @@ func NewRecord( TimeFlowEnd: currentTime.Add(-endDelta), } if metrics.FlowRtt != 0 { - rttDelta := time.Duration(metrics.FlowRtt) - record.TimeFlowRtt = rttDelta + record.TimeFlowRtt = time.Duration(metrics.FlowRtt) } - if metrics.DnsRecord.ReqMonoTimeTs != 0 { - reqDNS := time.Duration(monotonicCurrentTime - metrics.DnsRecord.ReqMonoTimeTs) - record.TimeDNSRequest = currentTime.Add(-reqDNS) - } - if metrics.DnsRecord.RspMonoTimeTs != 0 { - rspDNS := time.Duration(monotonicCurrentTime - metrics.DnsRecord.RspMonoTimeTs) - record.TimeDNSResponse = currentTime.Add(-rspDNS) + if metrics.DnsRecord.Latency != 0 { + record.DNSLatency = time.Duration(metrics.DnsRecord.Latency) } return &record } diff --git a/pkg/flow/record_test.go b/pkg/flow/record_test.go index 772f4c715446f3a521bd52f18a81c70c9b02bdd3..9af0a7228b5b897ed4e0bf78406b121b0de91560 100644 --- a/pkg/flow/record_test.go +++ b/pkg/flow/record_test.go @@ -41,8 +41,7 @@ func TestRecordBinaryEncoding(t *testing.T) { // dns_record structure 01, 00, // id 0x80, 00, // flags - 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, // req ts - 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, // rsp ts + 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, // latency // u64 flow_rtt 0xad, 0xde, 0xef, 0xbe, 0xef, 0xbe, 0xad, 0xde, })) @@ -78,10 +77,9 @@ func TestRecordBinaryEncoding(t *testing.T) { LatestDropCause: 0x11, }, DnsRecord: ebpf.BpfDnsRecordT{ - Id: 0x0001, - Flags: 0x0080, - ReqMonoTimeTs: 0x1817161514131211, - RspMonoTimeTs: 0x2827262524232221, + Id: 0x0001, + Flags: 0x0080, + Latency: 0x1817161514131211, }, FlowRtt: 0xdeadbeefbeefdead, }, diff --git a/pkg/grpc/grpc_test.go b/pkg/grpc/grpc_test.go index 4328dc174a41469688837b3c636d4aba264d98a6..b25604579743a55870064601886946b5dbb1e2ac 100644 --- a/pkg/grpc/grpc_test.go +++ b/pkg/grpc/grpc_test.go @@ -170,8 +170,6 @@ func BenchmarkIPv4GRPCCommunication(b *testing.B) { TcpDropLatestFlags: 1, TcpDropLatestState: 2, TcpDropLatestDropCause: 3, - TimeDnsReq: timestamppb.Now(), - TimeDnsRsp: timestamppb.Now(), } records := &pbflow.Records{} for i := 0; i < 100; i++ { @@ -230,8 +228,6 @@ func BenchmarkIPv6GRPCCommunication(b *testing.B) { TcpDropLatestDropCause: 3, DnsId: 1, DnsFlags: 100, - TimeDnsReq: timestamppb.Now(), - TimeDnsRsp: timestamppb.Now(), } records := &pbflow.Records{} for i := 0; i < 100; i++ { diff --git a/pkg/pbflow/flow.pb.go b/pkg/pbflow/flow.pb.go index 846d7467b8e50b3924cc8164a32bc56ed99e7e91..10288e9e10deaad0c683e0a66c3b7463b817d232 100644 --- a/pkg/pbflow/flow.pb.go +++ b/pkg/pbflow/flow.pb.go @@ -178,20 +178,19 @@ type Record struct { // From all the duplicate flows, one will set this value to false and the rest will be true. Duplicate bool `protobuf:"varint,11,opt,name=duplicate,proto3" json:"duplicate,omitempty"` // Agent IP address to help identifying the source of the flow - AgentIp *IP `protobuf:"bytes,12,opt,name=agent_ip,json=agentIp,proto3" json:"agent_ip,omitempty"` - Flags uint32 `protobuf:"varint,13,opt,name=flags,proto3" json:"flags,omitempty"` - IcmpType uint32 `protobuf:"varint,14,opt,name=icmp_type,json=icmpType,proto3" json:"icmp_type,omitempty"` - IcmpCode uint32 `protobuf:"varint,15,opt,name=icmp_code,json=icmpCode,proto3" json:"icmp_code,omitempty"` - TcpDropBytes uint64 `protobuf:"varint,16,opt,name=tcp_drop_bytes,json=tcpDropBytes,proto3" json:"tcp_drop_bytes,omitempty"` - TcpDropPackets uint64 `protobuf:"varint,17,opt,name=tcp_drop_packets,json=tcpDropPackets,proto3" json:"tcp_drop_packets,omitempty"` - TcpDropLatestFlags uint32 `protobuf:"varint,18,opt,name=tcp_drop_latest_flags,json=tcpDropLatestFlags,proto3" json:"tcp_drop_latest_flags,omitempty"` - TcpDropLatestState uint32 `protobuf:"varint,19,opt,name=tcp_drop_latest_state,json=tcpDropLatestState,proto3" json:"tcp_drop_latest_state,omitempty"` - TcpDropLatestDropCause uint32 `protobuf:"varint,20,opt,name=tcp_drop_latest_drop_cause,json=tcpDropLatestDropCause,proto3" json:"tcp_drop_latest_drop_cause,omitempty"` - DnsId uint32 `protobuf:"varint,21,opt,name=dns_id,json=dnsId,proto3" json:"dns_id,omitempty"` - DnsFlags uint32 `protobuf:"varint,22,opt,name=dns_flags,json=dnsFlags,proto3" json:"dns_flags,omitempty"` - TimeDnsReq *timestamppb.Timestamp `protobuf:"bytes,23,opt,name=time_dns_req,json=timeDnsReq,proto3" json:"time_dns_req,omitempty"` - TimeDnsRsp *timestamppb.Timestamp `protobuf:"bytes,24,opt,name=time_dns_rsp,json=timeDnsRsp,proto3" json:"time_dns_rsp,omitempty"` - TimeFlowRtt *durationpb.Duration `protobuf:"bytes,25,opt,name=time_flow_rtt,json=timeFlowRtt,proto3" json:"time_flow_rtt,omitempty"` + AgentIp *IP `protobuf:"bytes,12,opt,name=agent_ip,json=agentIp,proto3" json:"agent_ip,omitempty"` + Flags uint32 `protobuf:"varint,13,opt,name=flags,proto3" json:"flags,omitempty"` + IcmpType uint32 `protobuf:"varint,14,opt,name=icmp_type,json=icmpType,proto3" json:"icmp_type,omitempty"` + IcmpCode uint32 `protobuf:"varint,15,opt,name=icmp_code,json=icmpCode,proto3" json:"icmp_code,omitempty"` + TcpDropBytes uint64 `protobuf:"varint,16,opt,name=tcp_drop_bytes,json=tcpDropBytes,proto3" json:"tcp_drop_bytes,omitempty"` + TcpDropPackets uint64 `protobuf:"varint,17,opt,name=tcp_drop_packets,json=tcpDropPackets,proto3" json:"tcp_drop_packets,omitempty"` + TcpDropLatestFlags uint32 `protobuf:"varint,18,opt,name=tcp_drop_latest_flags,json=tcpDropLatestFlags,proto3" json:"tcp_drop_latest_flags,omitempty"` + TcpDropLatestState uint32 `protobuf:"varint,19,opt,name=tcp_drop_latest_state,json=tcpDropLatestState,proto3" json:"tcp_drop_latest_state,omitempty"` + TcpDropLatestDropCause uint32 `protobuf:"varint,20,opt,name=tcp_drop_latest_drop_cause,json=tcpDropLatestDropCause,proto3" json:"tcp_drop_latest_drop_cause,omitempty"` + DnsId uint32 `protobuf:"varint,21,opt,name=dns_id,json=dnsId,proto3" json:"dns_id,omitempty"` + DnsFlags uint32 `protobuf:"varint,22,opt,name=dns_flags,json=dnsFlags,proto3" json:"dns_flags,omitempty"` + DnsLatency *durationpb.Duration `protobuf:"bytes,23,opt,name=dns_latency,json=dnsLatency,proto3" json:"dns_latency,omitempty"` + TimeFlowRtt *durationpb.Duration `protobuf:"bytes,24,opt,name=time_flow_rtt,json=timeFlowRtt,proto3" json:"time_flow_rtt,omitempty"` } func (x *Record) Reset() { @@ -380,16 +379,9 @@ func (x *Record) GetDnsFlags() uint32 { return 0 } -func (x *Record) GetTimeDnsReq() *timestamppb.Timestamp { +func (x *Record) GetDnsLatency() *durationpb.Duration { if x != nil { - return x.TimeDnsReq - } - return nil -} - -func (x *Record) GetTimeDnsRsp() *timestamppb.Timestamp { - if x != nil { - return x.TimeDnsRsp + return x.DnsLatency } return nil } @@ -669,7 +661,7 @@ var file_proto_flow_proto_rawDesc = []byte{ 0x07, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x12, 0x28, 0x0a, 0x07, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x70, 0x62, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x07, 0x65, 0x6e, 0x74, 0x72, 0x69, - 0x65, 0x73, 0x22, 0xaf, 0x08, 0x0a, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x21, 0x0a, + 0x65, 0x73, 0x22, 0xef, 0x07, 0x0a, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x65, 0x74, 0x68, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0b, 0x65, 0x74, 0x68, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x12, 0x2f, 0x0a, 0x09, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, @@ -724,16 +716,12 @@ var file_proto_flow_proto_rawDesc = []byte{ 0x43, 0x61, 0x75, 0x73, 0x65, 0x12, 0x15, 0x0a, 0x06, 0x64, 0x6e, 0x73, 0x5f, 0x69, 0x64, 0x18, 0x15, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x64, 0x6e, 0x73, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x64, 0x6e, 0x73, 0x5f, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x18, 0x16, 0x20, 0x01, 0x28, 0x0d, 0x52, - 0x08, 0x64, 0x6e, 0x73, 0x46, 0x6c, 0x61, 0x67, 0x73, 0x12, 0x3c, 0x0a, 0x0c, 0x74, 0x69, 0x6d, - 0x65, 0x5f, 0x64, 0x6e, 0x73, 0x5f, 0x72, 0x65, 0x71, 0x18, 0x17, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0a, 0x74, 0x69, 0x6d, - 0x65, 0x44, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x12, 0x3c, 0x0a, 0x0c, 0x74, 0x69, 0x6d, 0x65, 0x5f, - 0x64, 0x6e, 0x73, 0x5f, 0x72, 0x73, 0x70, 0x18, 0x18, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, - 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, - 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x44, - 0x6e, 0x73, 0x52, 0x73, 0x70, 0x12, 0x3d, 0x0a, 0x0d, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x66, 0x6c, - 0x6f, 0x77, 0x5f, 0x72, 0x74, 0x74, 0x18, 0x19, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, + 0x08, 0x64, 0x6e, 0x73, 0x46, 0x6c, 0x61, 0x67, 0x73, 0x12, 0x3a, 0x0a, 0x0b, 0x64, 0x6e, 0x73, + 0x5f, 0x6c, 0x61, 0x74, 0x65, 0x6e, 0x63, 0x79, 0x18, 0x17, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, + 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x64, 0x6e, 0x73, 0x4c, 0x61, + 0x74, 0x65, 0x6e, 0x63, 0x79, 0x12, 0x3d, 0x0a, 0x0d, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x66, 0x6c, + 0x6f, 0x77, 0x5f, 0x72, 0x74, 0x74, 0x18, 0x18, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0b, 0x74, 0x69, 0x6d, 0x65, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x74, 0x74, 0x22, 0x3c, 0x0a, 0x08, 0x44, 0x61, 0x74, 0x61, 0x4c, 0x69, 0x6e, 0x6b, @@ -801,18 +789,17 @@ var file_proto_flow_proto_depIdxs = []int32{ 5, // 5: pbflow.Record.network:type_name -> pbflow.Network 7, // 6: pbflow.Record.transport:type_name -> pbflow.Transport 6, // 7: pbflow.Record.agent_ip:type_name -> pbflow.IP - 8, // 8: pbflow.Record.time_dns_req:type_name -> google.protobuf.Timestamp - 8, // 9: pbflow.Record.time_dns_rsp:type_name -> google.protobuf.Timestamp - 9, // 10: pbflow.Record.time_flow_rtt:type_name -> google.protobuf.Duration - 6, // 11: pbflow.Network.src_addr:type_name -> pbflow.IP - 6, // 12: pbflow.Network.dst_addr:type_name -> pbflow.IP - 2, // 13: pbflow.Collector.Send:input_type -> pbflow.Records - 1, // 14: pbflow.Collector.Send:output_type -> pbflow.CollectorReply - 14, // [14:15] is the sub-list for method output_type - 13, // [13:14] is the sub-list for method input_type - 13, // [13:13] is the sub-list for extension type_name - 13, // [13:13] is the sub-list for extension extendee - 0, // [0:13] is the sub-list for field type_name + 9, // 8: pbflow.Record.dns_latency:type_name -> google.protobuf.Duration + 9, // 9: pbflow.Record.time_flow_rtt:type_name -> google.protobuf.Duration + 6, // 10: pbflow.Network.src_addr:type_name -> pbflow.IP + 6, // 11: pbflow.Network.dst_addr:type_name -> pbflow.IP + 2, // 12: pbflow.Collector.Send:input_type -> pbflow.Records + 1, // 13: pbflow.Collector.Send:output_type -> pbflow.CollectorReply + 13, // [13:14] is the sub-list for method output_type + 12, // [12:13] is the sub-list for method input_type + 12, // [12:12] is the sub-list for extension type_name + 12, // [12:12] is the sub-list for extension extendee + 0, // [0:12] is the sub-list for field type_name } func init() { file_proto_flow_proto_init() } diff --git a/proto/flow.proto b/proto/flow.proto index 1b15f0f69d8d56e0b1b75e75899fa059f13a98a0..18775e2356699cddb7ebe6a4435aa7b8ecb3fcf3 100644 --- a/proto/flow.proto +++ b/proto/flow.proto @@ -51,9 +51,8 @@ message Record { uint32 tcp_drop_latest_drop_cause = 20; uint32 dns_id = 21; uint32 dns_flags = 22; - google.protobuf.Timestamp time_dns_req = 23; - google.protobuf.Timestamp time_dns_rsp = 24; - google.protobuf.Duration time_flow_rtt = 25; + google.protobuf.Duration dns_latency = 23; + google.protobuf.Duration time_flow_rtt = 24; } message DataLink {