diff --git a/bpf/dns_tracker.h b/bpf/dns_tracker.h index 97e124eaf9d35e99dc4e9adfcb05484a4416075d..48506eba82a4701b4f0a72a9b311795249173d63 100644 --- a/bpf/dns_tracker.h +++ b/bpf/dns_tracker.h @@ -97,7 +97,7 @@ static inline int trace_dns(struct sk_buff *skb) { if ((bpf_ntohs(dns.flags) & DNS_QR_FLAG) == 0) { /* dns query */ fill_dns_id(&id, &dns_req, bpf_ntohs(dns.id), false); if (bpf_map_lookup_elem(&dns_flows, &dns_req) == NULL) { - u64 ts = bpf_ktime_get_ns(); + u64 ts = bpf_ktime_get_ns(); bpf_map_update_elem(&dns_flows, &dns_req, &ts, BPF_ANY); } id.direction = EGRESS; diff --git a/bpf/flows.c b/bpf/flows.c index 4aa3931df0c5d4b3a24dfafe6e4fe6bbf8be4a50..373bc561fc578da3b5de480f4cff0382228cb0df 100644 --- a/bpf/flows.c +++ b/bpf/flows.c @@ -40,17 +40,14 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { return TC_ACT_OK; } - // Record the current time first. - u64 current_time = bpf_ktime_get_ns(); + pkt_info pkt; + __builtin_memset(&pkt, 0, sizeof(pkt)); flow_id id; __builtin_memset(&id, 0, sizeof(id)); - pkt_info pkt; - __builtin_memset(&pkt, 0, sizeof(pkt)); - + pkt.current_ts = bpf_ktime_get_ns(); // Record the current time first. pkt.id = &id; - pkt.current_ts = current_time; void *data_end = (void *)(long)skb->data_end; void *data = (void *)(long)skb->data; @@ -60,30 +57,28 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { return TC_ACT_OK; } - if (enable_rtt) { - // This is currently gated as its not to be enabled by default. - calculate_flow_rtt(&pkt, direction, data_end); - } - //Set extra fields id.if_index = skb->ifindex; id.direction = direction; + // We calculate the RTT before looking up aggregated_flows map because we want + // to keep the critical section between map lookup and update consume minimum time. + if (enable_rtt) { + // This is currently not to be enabled by default. + calculate_flow_rtt(&pkt, direction, data_end); + } + // TODO: we need to add spinlock here when we deprecate versions prior to 5.1, or provide // a spinlocked alternative version and use it selectively https://lwn.net/Articles/779120/ flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id); if (aggregate_flow != NULL) { aggregate_flow->packets += 1; aggregate_flow->bytes += skb->len; - aggregate_flow->end_mono_time_ts = current_time; + aggregate_flow->end_mono_time_ts = pkt.current_ts; aggregate_flow->flags |= pkt.flags; // Does not matter the gate. Will be zero if not enabled. - if (pkt.rtt > 0) { - /* Since RTT is calculated for few packets we need to check if it is non zero value then only we update - * the flow. If we remove this check a packet which fails to calculate RTT will override the previous valid - * RTT with 0. - */ + if (pkt.rtt > aggregate_flow->flow_rtt) { aggregate_flow->flow_rtt = pkt.rtt; } @@ -101,8 +96,8 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { flow_metrics new_flow = { .packets = 1, .bytes = skb->len, - .start_mono_time_ts = current_time, - .end_mono_time_ts = current_time, + .start_mono_time_ts = pkt.current_ts, + .end_mono_time_ts = pkt.current_ts, .flags = pkt.flags, .flow_rtt = pkt.rtt }; diff --git a/bpf/rtt_tracker.h b/bpf/rtt_tracker.h index d4c81b72b524d097ceb690c9d46a79d3268ca70f..a5cd98f8eadf4eeffb330359a5ab7d1ac5911d82 100644 --- a/bpf/rtt_tracker.h +++ b/bpf/rtt_tracker.h @@ -10,9 +10,11 @@ #include "utils.h" #include "maps_definition.h" -static __always_inline void fill_flow_seq_id(flow_seq_id *seq_id, pkt_info *pkt, u32 seq, u8 reversed) { +const u64 MIN_RTT = 50000; //50 micro seconds + +static __always_inline void fill_flow_seq_id(flow_seq_id *seq_id, pkt_info *pkt, u32 seq, bool reverse) { flow_id *id = pkt->id; - if (reversed) { + if (reverse) { __builtin_memcpy(seq_id->src_ip, id->dst_ip, IP_MAX_LEN); __builtin_memcpy(seq_id->dst_ip, id->src_ip, IP_MAX_LEN); seq_id->src_port = id->dst_port; @@ -23,49 +25,104 @@ static __always_inline void fill_flow_seq_id(flow_seq_id *seq_id, pkt_info *pkt, seq_id->src_port = id->src_port; seq_id->dst_port = id->dst_port; } + seq_id->transport_protocol = id->transport_protocol; seq_id->seq_id = seq; + seq_id->if_index = id->if_index; } -static __always_inline void calculate_flow_rtt_tcp(pkt_info *pkt, u8 direction, void *data_end, flow_seq_id *seq_id) { - struct tcphdr *tcp = (struct tcphdr *) pkt->l4_hdr; - if ( !tcp || ((void *)tcp + sizeof(*tcp) > data_end) ) { - return; - } +static __always_inline void reverse_flow_id_struct(flow_id *src, flow_id *dst) { + // Fields which remain same + dst->eth_protocol = src->eth_protocol; + dst->transport_protocol = src->transport_protocol; + dst->if_index = src->if_index; + + // Fields which should be reversed + dst->direction = (src->direction == INGRESS) ? EGRESS : INGRESS; + __builtin_memcpy(dst->src_mac, src->dst_mac, ETH_ALEN); + __builtin_memcpy(dst->dst_mac, src->src_mac, ETH_ALEN); + __builtin_memcpy(dst->src_ip, src->dst_ip, IP_MAX_LEN); + __builtin_memcpy(dst->dst_ip, src->src_ip, IP_MAX_LEN); + dst->src_port = src->dst_port; + dst->dst_port = src->src_port; + /* ICMP type can be ignore for now. We only deal with TCP packets for now.*/ +} - switch (direction) { - case EGRESS: { - if (IS_SYN_PACKET(pkt)) { - // Record the outgoing syn sequence number - u32 seq = bpf_ntohl(tcp->seq); - fill_flow_seq_id(seq_id, pkt, seq, 0); +static __always_inline void update_reverse_flow_rtt(pkt_info *pkt, u32 seq) { + flow_id rev_flow_id; + __builtin_memset(&rev_flow_id, 0, sizeof(rev_flow_id)); + reverse_flow_id_struct(pkt->id, &rev_flow_id); - long ret = bpf_map_update_elem(&flow_sequences, seq_id, &pkt->current_ts, BPF_ANY); + flow_metrics *reverse_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &rev_flow_id); + if (reverse_flow != NULL) { + if (pkt->rtt > reverse_flow->flow_rtt) { + reverse_flow->flow_rtt = pkt->rtt; + long ret = bpf_map_update_elem(&aggregated_flows, &rev_flow_id, reverse_flow, BPF_EXIST); if (trace_messages && ret != 0) { - bpf_printk("err saving flow sequence record %d", ret); + bpf_printk("error updating rtt value in flow %d\n", ret); } } - break; } - case INGRESS: { - if (IS_ACK_PACKET(pkt)) { - // Stored sequence should be ack_seq - 1 - u32 seq = bpf_ntohl(tcp->ack_seq) - 1; - // check reversed flow - fill_flow_seq_id(seq_id, pkt, seq, 1); - - u64 *prev_ts = (u64 *)bpf_map_lookup_elem(&flow_sequences, seq_id); - if (prev_ts != NULL) { - pkt->rtt = pkt->current_ts - *prev_ts; - // Delete the flow from flow sequence map so if it - // restarts we have a new RTT calculation. - long ret = bpf_map_delete_elem(&flow_sequences, seq_id); - if (trace_messages && ret != 0) { - bpf_printk("error evicting flow sequence: %d", ret); - } - } +} + +static __always_inline void __calculate_tcp_rtt(pkt_info *pkt, struct tcphdr *tcp, flow_seq_id *seq_id) { + // Stored sequence should be ack_seq - 1 + u32 seq = bpf_ntohl(tcp->ack_seq) - 1; + // check reversed flow + fill_flow_seq_id(seq_id, pkt, seq, true); + + u64 *prev_ts = (u64 *)bpf_map_lookup_elem(&flow_sequences, seq_id); + if (prev_ts != NULL) { + u64 rtt = pkt->current_ts - *prev_ts; + /** + * FIXME: Because of SAMPLING the way it is done if we miss one of SYN/SYN+ACK/ACK + * then we can get RTT values which are the process response time rather than actual RTT. + * This check below clears them out but needs to be modified with a better solution or change + * the algorithm for calculating RTT so it doesn't interact with SAMPLING like this. + */ + if (rtt < MIN_RTT) { + return; } - break; + pkt->rtt = rtt; + // Delete the flow from flow sequence map so if it + // restarts we have a new RTT calculation. + long ret = bpf_map_delete_elem(&flow_sequences, seq_id); + if (trace_messages && ret != 0) { + bpf_printk("error evicting flow sequence: %d", ret); + } + // This is an ACK packet with valid sequence id so a SYN must + // have been sent. We can safely update the reverse flow RTT here. + update_reverse_flow_rtt(pkt, seq); } + return; +} + +static __always_inline void __store_tcp_ts(pkt_info *pkt, struct tcphdr *tcp, flow_seq_id *seq_id) { + // store timestamp of syn packets. + u32 seq = bpf_ntohl(tcp->seq); + fill_flow_seq_id(seq_id, pkt, seq, false); + long ret = bpf_map_update_elem(&flow_sequences, seq_id, &pkt->current_ts, BPF_NOEXIST); + if (trace_messages && ret != 0) { + bpf_printk("err saving flow sequence record %d", ret); + } + return; +} + +static __always_inline void calculate_flow_rtt_tcp(pkt_info *pkt, u8 direction, void *data_end, flow_seq_id *seq_id) { + struct tcphdr *tcp = (struct tcphdr *) pkt->l4_hdr; + if ( !tcp || ((void *)tcp + sizeof(*tcp) > data_end) ) { + return; + } + + /* We calculate RTT for both SYN/SYN+ACK and SYN+ACK/ACK and take the maximum of both.*/ + if (tcp->syn && tcp->ack) { // SYN ACK Packet + __calculate_tcp_rtt(pkt, tcp, seq_id); + __store_tcp_ts(pkt, tcp, seq_id); + } + else if (tcp->ack) { + __calculate_tcp_rtt(pkt, tcp, seq_id); + } + else if (tcp->syn) { + __store_tcp_ts(pkt, tcp, seq_id); } } @@ -83,5 +140,4 @@ static __always_inline void calculate_flow_rtt(pkt_info *pkt, u8 direction, void } } -#endif /* __RTT_TRACKER_H__ */ - +#endif /* __RTT_TRACKER_H__ */ \ No newline at end of file diff --git a/bpf/types.h b/bpf/types.h index dcc2a316be2b68b7a154106407e9edc9b070e557..db460886595ea086f6779a92fd053ac878cddb45 100644 --- a/bpf/types.h +++ b/bpf/types.h @@ -22,9 +22,6 @@ #define FIN_ACK_FLAG 0x200 #define RST_ACK_FLAG 0x400 -#define IS_SYN_PACKET(pkt) ((pkt->flags & SYN_FLAG) || (pkt->flags & SYN_ACK_FLAG)) -#define IS_ACK_PACKET(pkt) ((pkt->flags & ACK_FLAG) || (pkt->flags & SYN_ACK_FLAG)) - #if defined(__BYTE_ORDER__) && defined(__ORDER_LITTLE_ENDIAN__) && \ __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ #define bpf_ntohs(x) __builtin_bswap16(x) @@ -124,7 +121,7 @@ typedef struct flow_id_t { // Force emitting struct flow_id into the ELF. const struct flow_id_t *unused2 __attribute__((unused)); -// Standard 4 tuple and a sequence identifier. +// Standard 4 tuple, transport protocol and a sequence identifier. // No need to emit this struct. It's used only in kernel space typedef struct flow_seq_id_t { u16 src_port; @@ -132,6 +129,8 @@ typedef struct flow_seq_id_t { u8 src_ip[IP_MAX_LEN]; u8 dst_ip[IP_MAX_LEN]; u32 seq_id; + u8 transport_protocol; + u32 if_index; // OS interface index } __attribute__((packed)) flow_seq_id; // Flow record is a tuple containing both flow identifier and metrics. It is used to send diff --git a/examples/flowlogs-dump/server/flowlogs-dump-collector.go b/examples/flowlogs-dump/server/flowlogs-dump-collector.go index c8c6f2f66bd3f14448d31e271672d4a96fb65ae1..bed9028f3e348e05ffb39447bf73e76df55a4aba 100644 --- a/examples/flowlogs-dump/server/flowlogs-dump-collector.go +++ b/examples/flowlogs-dump/server/flowlogs-dump-collector.go @@ -72,7 +72,7 @@ func main() { for records := range receivedRecords { for _, record := range records.Entries { if record.EthProtocol == ipv6 { - log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt %v\n", + log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v\n", ipProto[record.EthProtocol], record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"), record.Interface, @@ -91,10 +91,10 @@ func main() { record.GetDnsId(), record.GetDnsFlags(), record.DnsLatency.AsDuration().Milliseconds(), - record.TimeFlowRtt.AsDuration().Microseconds(), + record.TimeFlowRtt.AsDuration().Nanoseconds(), ) } else { - log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt %v\n", + log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v\n", ipProto[record.EthProtocol], record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"), record.Interface, @@ -113,7 +113,7 @@ func main() { record.GetDnsId(), record.GetDnsFlags(), record.DnsLatency.AsDuration().Milliseconds(), - record.TimeFlowRtt.AsDuration().Microseconds(), + record.TimeFlowRtt.AsDuration().Nanoseconds(), ) } } diff --git a/pkg/ebpf/bpf_bpfeb.go b/pkg/ebpf/bpf_bpfeb.go index b2295c912d2efa819629194e72b50c74d9433617..bd6a95c7ca4ba7aa0ba3a892f45234583053a6a4 100644 --- a/pkg/ebpf/bpf_bpfeb.go +++ b/pkg/ebpf/bpf_bpfeb.go @@ -64,11 +64,13 @@ type BpfFlowRecordT struct { } type BpfFlowSeqId struct { - SrcPort uint16 - DstPort uint16 - SrcIp [16]uint8 - DstIp [16]uint8 - SeqId uint32 + SrcPort uint16 + DstPort uint16 + SrcIp [16]uint8 + DstIp [16]uint8 + SeqId uint32 + TransportProtocol uint8 + IfIndex uint32 } type BpfPktDropsT struct { diff --git a/pkg/ebpf/bpf_bpfeb.o b/pkg/ebpf/bpf_bpfeb.o index 96f86b75e0c0741ebfb37639f83ad9140471d613..b7d0979401747cc2a100c166a54cac0156314c5b 100644 Binary files a/pkg/ebpf/bpf_bpfeb.o and b/pkg/ebpf/bpf_bpfeb.o differ diff --git a/pkg/ebpf/bpf_bpfel.go b/pkg/ebpf/bpf_bpfel.go index 890682d5ba2688f58c5ea0e585e658d8e80de3e2..cfc06c8f0805991934bc4b881f6bc68db30f7f4a 100644 --- a/pkg/ebpf/bpf_bpfel.go +++ b/pkg/ebpf/bpf_bpfel.go @@ -64,11 +64,13 @@ type BpfFlowRecordT struct { } type BpfFlowSeqId struct { - SrcPort uint16 - DstPort uint16 - SrcIp [16]uint8 - DstIp [16]uint8 - SeqId uint32 + SrcPort uint16 + DstPort uint16 + SrcIp [16]uint8 + DstIp [16]uint8 + SeqId uint32 + TransportProtocol uint8 + IfIndex uint32 } type BpfPktDropsT struct { diff --git a/pkg/ebpf/bpf_bpfel.o b/pkg/ebpf/bpf_bpfel.o index 91758a5b344832ea6dad8ff7b92cd097a515c560..6cc2976532c7ce9021e109fa087c940ab959b188 100644 Binary files a/pkg/ebpf/bpf_bpfel.o and b/pkg/ebpf/bpf_bpfel.o differ diff --git a/pkg/ebpf/tracer.go b/pkg/ebpf/tracer.go index 0db9308c276f1e0b350a384177247d017851b230..acdea31ec4d73ffe92129a6fd0c74930e8c06f5d 100644 --- a/pkg/ebpf/tracer.go +++ b/pkg/ebpf/tracer.go @@ -101,6 +101,8 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) { if enableRtt == 0 { // Cannot set the size of map to be 0 so set it to 1. spec.Maps[flowSequencesMap].MaxEntries = uint32(1) + } else { + log.Debugf("RTT calculations are enabled") } if !cfg.DNSTracker {