Skip to content
Snippets Groups Projects
Unverified Commit 6d9d2e7d authored by Dushyant Behl's avatar Dushyant Behl Committed by GitHub
Browse files

NETOBSERV-1112: This patch fixes a bug where RTT was not visible for flow logs at times. (#159)


* Make RTT values show up on both flow directions

* Fix RTT calculation error for container hooks.

Signed-off-by: default avatarDushyant Behl <dushyantbehl@hotmail.com>
Signed-off-by: default avatarDushyant Behl <dushyantbehl@users.noreply.github.com>

---------

Signed-off-by: default avatarDushyant Behl <dushyantbehl@hotmail.com>
Signed-off-by: default avatarDushyant Behl <dushyantbehl@users.noreply.github.com>
parent af7d59da
No related branches found
No related tags found
No related merge requests found
...@@ -97,7 +97,7 @@ static inline int trace_dns(struct sk_buff *skb) { ...@@ -97,7 +97,7 @@ static inline int trace_dns(struct sk_buff *skb) {
if ((bpf_ntohs(dns.flags) & DNS_QR_FLAG) == 0) { /* dns query */ if ((bpf_ntohs(dns.flags) & DNS_QR_FLAG) == 0) { /* dns query */
fill_dns_id(&id, &dns_req, bpf_ntohs(dns.id), false); fill_dns_id(&id, &dns_req, bpf_ntohs(dns.id), false);
if (bpf_map_lookup_elem(&dns_flows, &dns_req) == NULL) { if (bpf_map_lookup_elem(&dns_flows, &dns_req) == NULL) {
u64 ts = bpf_ktime_get_ns(); u64 ts = bpf_ktime_get_ns();
bpf_map_update_elem(&dns_flows, &dns_req, &ts, BPF_ANY); bpf_map_update_elem(&dns_flows, &dns_req, &ts, BPF_ANY);
} }
id.direction = EGRESS; id.direction = EGRESS;
......
...@@ -40,17 +40,14 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { ...@@ -40,17 +40,14 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
return TC_ACT_OK; return TC_ACT_OK;
} }
// Record the current time first. pkt_info pkt;
u64 current_time = bpf_ktime_get_ns(); __builtin_memset(&pkt, 0, sizeof(pkt));
flow_id id; flow_id id;
__builtin_memset(&id, 0, sizeof(id)); __builtin_memset(&id, 0, sizeof(id));
pkt_info pkt; pkt.current_ts = bpf_ktime_get_ns(); // Record the current time first.
__builtin_memset(&pkt, 0, sizeof(pkt));
pkt.id = &id; pkt.id = &id;
pkt.current_ts = current_time;
void *data_end = (void *)(long)skb->data_end; void *data_end = (void *)(long)skb->data_end;
void *data = (void *)(long)skb->data; void *data = (void *)(long)skb->data;
...@@ -60,30 +57,28 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { ...@@ -60,30 +57,28 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
return TC_ACT_OK; return TC_ACT_OK;
} }
if (enable_rtt) {
// This is currently gated as its not to be enabled by default.
calculate_flow_rtt(&pkt, direction, data_end);
}
//Set extra fields //Set extra fields
id.if_index = skb->ifindex; id.if_index = skb->ifindex;
id.direction = direction; id.direction = direction;
// We calculate the RTT before looking up aggregated_flows map because we want
// to keep the critical section between map lookup and update consume minimum time.
if (enable_rtt) {
// This is currently not to be enabled by default.
calculate_flow_rtt(&pkt, direction, data_end);
}
// TODO: we need to add spinlock here when we deprecate versions prior to 5.1, or provide // TODO: we need to add spinlock here when we deprecate versions prior to 5.1, or provide
// a spinlocked alternative version and use it selectively https://lwn.net/Articles/779120/ // a spinlocked alternative version and use it selectively https://lwn.net/Articles/779120/
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id); flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) { if (aggregate_flow != NULL) {
aggregate_flow->packets += 1; aggregate_flow->packets += 1;
aggregate_flow->bytes += skb->len; aggregate_flow->bytes += skb->len;
aggregate_flow->end_mono_time_ts = current_time; aggregate_flow->end_mono_time_ts = pkt.current_ts;
aggregate_flow->flags |= pkt.flags; aggregate_flow->flags |= pkt.flags;
// Does not matter the gate. Will be zero if not enabled. // Does not matter the gate. Will be zero if not enabled.
if (pkt.rtt > 0) { if (pkt.rtt > aggregate_flow->flow_rtt) {
/* Since RTT is calculated for few packets we need to check if it is non zero value then only we update
* the flow. If we remove this check a packet which fails to calculate RTT will override the previous valid
* RTT with 0.
*/
aggregate_flow->flow_rtt = pkt.rtt; aggregate_flow->flow_rtt = pkt.rtt;
} }
...@@ -101,8 +96,8 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { ...@@ -101,8 +96,8 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
flow_metrics new_flow = { flow_metrics new_flow = {
.packets = 1, .packets = 1,
.bytes = skb->len, .bytes = skb->len,
.start_mono_time_ts = current_time, .start_mono_time_ts = pkt.current_ts,
.end_mono_time_ts = current_time, .end_mono_time_ts = pkt.current_ts,
.flags = pkt.flags, .flags = pkt.flags,
.flow_rtt = pkt.rtt .flow_rtt = pkt.rtt
}; };
......
...@@ -10,9 +10,11 @@ ...@@ -10,9 +10,11 @@
#include "utils.h" #include "utils.h"
#include "maps_definition.h" #include "maps_definition.h"
static __always_inline void fill_flow_seq_id(flow_seq_id *seq_id, pkt_info *pkt, u32 seq, u8 reversed) { const u64 MIN_RTT = 50000; //50 micro seconds
static __always_inline void fill_flow_seq_id(flow_seq_id *seq_id, pkt_info *pkt, u32 seq, bool reverse) {
flow_id *id = pkt->id; flow_id *id = pkt->id;
if (reversed) { if (reverse) {
__builtin_memcpy(seq_id->src_ip, id->dst_ip, IP_MAX_LEN); __builtin_memcpy(seq_id->src_ip, id->dst_ip, IP_MAX_LEN);
__builtin_memcpy(seq_id->dst_ip, id->src_ip, IP_MAX_LEN); __builtin_memcpy(seq_id->dst_ip, id->src_ip, IP_MAX_LEN);
seq_id->src_port = id->dst_port; seq_id->src_port = id->dst_port;
...@@ -23,49 +25,104 @@ static __always_inline void fill_flow_seq_id(flow_seq_id *seq_id, pkt_info *pkt, ...@@ -23,49 +25,104 @@ static __always_inline void fill_flow_seq_id(flow_seq_id *seq_id, pkt_info *pkt,
seq_id->src_port = id->src_port; seq_id->src_port = id->src_port;
seq_id->dst_port = id->dst_port; seq_id->dst_port = id->dst_port;
} }
seq_id->transport_protocol = id->transport_protocol;
seq_id->seq_id = seq; seq_id->seq_id = seq;
seq_id->if_index = id->if_index;
} }
static __always_inline void calculate_flow_rtt_tcp(pkt_info *pkt, u8 direction, void *data_end, flow_seq_id *seq_id) { static __always_inline void reverse_flow_id_struct(flow_id *src, flow_id *dst) {
struct tcphdr *tcp = (struct tcphdr *) pkt->l4_hdr; // Fields which remain same
if ( !tcp || ((void *)tcp + sizeof(*tcp) > data_end) ) { dst->eth_protocol = src->eth_protocol;
return; dst->transport_protocol = src->transport_protocol;
} dst->if_index = src->if_index;
// Fields which should be reversed
dst->direction = (src->direction == INGRESS) ? EGRESS : INGRESS;
__builtin_memcpy(dst->src_mac, src->dst_mac, ETH_ALEN);
__builtin_memcpy(dst->dst_mac, src->src_mac, ETH_ALEN);
__builtin_memcpy(dst->src_ip, src->dst_ip, IP_MAX_LEN);
__builtin_memcpy(dst->dst_ip, src->src_ip, IP_MAX_LEN);
dst->src_port = src->dst_port;
dst->dst_port = src->src_port;
/* ICMP type can be ignore for now. We only deal with TCP packets for now.*/
}
switch (direction) { static __always_inline void update_reverse_flow_rtt(pkt_info *pkt, u32 seq) {
case EGRESS: { flow_id rev_flow_id;
if (IS_SYN_PACKET(pkt)) { __builtin_memset(&rev_flow_id, 0, sizeof(rev_flow_id));
// Record the outgoing syn sequence number reverse_flow_id_struct(pkt->id, &rev_flow_id);
u32 seq = bpf_ntohl(tcp->seq);
fill_flow_seq_id(seq_id, pkt, seq, 0);
long ret = bpf_map_update_elem(&flow_sequences, seq_id, &pkt->current_ts, BPF_ANY); flow_metrics *reverse_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &rev_flow_id);
if (reverse_flow != NULL) {
if (pkt->rtt > reverse_flow->flow_rtt) {
reverse_flow->flow_rtt = pkt->rtt;
long ret = bpf_map_update_elem(&aggregated_flows, &rev_flow_id, reverse_flow, BPF_EXIST);
if (trace_messages && ret != 0) { if (trace_messages && ret != 0) {
bpf_printk("err saving flow sequence record %d", ret); bpf_printk("error updating rtt value in flow %d\n", ret);
} }
} }
break;
} }
case INGRESS: { }
if (IS_ACK_PACKET(pkt)) {
// Stored sequence should be ack_seq - 1 static __always_inline void __calculate_tcp_rtt(pkt_info *pkt, struct tcphdr *tcp, flow_seq_id *seq_id) {
u32 seq = bpf_ntohl(tcp->ack_seq) - 1; // Stored sequence should be ack_seq - 1
// check reversed flow u32 seq = bpf_ntohl(tcp->ack_seq) - 1;
fill_flow_seq_id(seq_id, pkt, seq, 1); // check reversed flow
fill_flow_seq_id(seq_id, pkt, seq, true);
u64 *prev_ts = (u64 *)bpf_map_lookup_elem(&flow_sequences, seq_id);
if (prev_ts != NULL) { u64 *prev_ts = (u64 *)bpf_map_lookup_elem(&flow_sequences, seq_id);
pkt->rtt = pkt->current_ts - *prev_ts; if (prev_ts != NULL) {
// Delete the flow from flow sequence map so if it u64 rtt = pkt->current_ts - *prev_ts;
// restarts we have a new RTT calculation. /**
long ret = bpf_map_delete_elem(&flow_sequences, seq_id); * FIXME: Because of SAMPLING the way it is done if we miss one of SYN/SYN+ACK/ACK
if (trace_messages && ret != 0) { * then we can get RTT values which are the process response time rather than actual RTT.
bpf_printk("error evicting flow sequence: %d", ret); * This check below clears them out but needs to be modified with a better solution or change
} * the algorithm for calculating RTT so it doesn't interact with SAMPLING like this.
} */
if (rtt < MIN_RTT) {
return;
} }
break; pkt->rtt = rtt;
// Delete the flow from flow sequence map so if it
// restarts we have a new RTT calculation.
long ret = bpf_map_delete_elem(&flow_sequences, seq_id);
if (trace_messages && ret != 0) {
bpf_printk("error evicting flow sequence: %d", ret);
}
// This is an ACK packet with valid sequence id so a SYN must
// have been sent. We can safely update the reverse flow RTT here.
update_reverse_flow_rtt(pkt, seq);
} }
return;
}
static __always_inline void __store_tcp_ts(pkt_info *pkt, struct tcphdr *tcp, flow_seq_id *seq_id) {
// store timestamp of syn packets.
u32 seq = bpf_ntohl(tcp->seq);
fill_flow_seq_id(seq_id, pkt, seq, false);
long ret = bpf_map_update_elem(&flow_sequences, seq_id, &pkt->current_ts, BPF_NOEXIST);
if (trace_messages && ret != 0) {
bpf_printk("err saving flow sequence record %d", ret);
}
return;
}
static __always_inline void calculate_flow_rtt_tcp(pkt_info *pkt, u8 direction, void *data_end, flow_seq_id *seq_id) {
struct tcphdr *tcp = (struct tcphdr *) pkt->l4_hdr;
if ( !tcp || ((void *)tcp + sizeof(*tcp) > data_end) ) {
return;
}
/* We calculate RTT for both SYN/SYN+ACK and SYN+ACK/ACK and take the maximum of both.*/
if (tcp->syn && tcp->ack) { // SYN ACK Packet
__calculate_tcp_rtt(pkt, tcp, seq_id);
__store_tcp_ts(pkt, tcp, seq_id);
}
else if (tcp->ack) {
__calculate_tcp_rtt(pkt, tcp, seq_id);
}
else if (tcp->syn) {
__store_tcp_ts(pkt, tcp, seq_id);
} }
} }
...@@ -83,5 +140,4 @@ static __always_inline void calculate_flow_rtt(pkt_info *pkt, u8 direction, void ...@@ -83,5 +140,4 @@ static __always_inline void calculate_flow_rtt(pkt_info *pkt, u8 direction, void
} }
} }
#endif /* __RTT_TRACKER_H__ */ #endif /* __RTT_TRACKER_H__ */
\ No newline at end of file
...@@ -22,9 +22,6 @@ ...@@ -22,9 +22,6 @@
#define FIN_ACK_FLAG 0x200 #define FIN_ACK_FLAG 0x200
#define RST_ACK_FLAG 0x400 #define RST_ACK_FLAG 0x400
#define IS_SYN_PACKET(pkt) ((pkt->flags & SYN_FLAG) || (pkt->flags & SYN_ACK_FLAG))
#define IS_ACK_PACKET(pkt) ((pkt->flags & ACK_FLAG) || (pkt->flags & SYN_ACK_FLAG))
#if defined(__BYTE_ORDER__) && defined(__ORDER_LITTLE_ENDIAN__) && \ #if defined(__BYTE_ORDER__) && defined(__ORDER_LITTLE_ENDIAN__) && \
__BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
#define bpf_ntohs(x) __builtin_bswap16(x) #define bpf_ntohs(x) __builtin_bswap16(x)
...@@ -124,7 +121,7 @@ typedef struct flow_id_t { ...@@ -124,7 +121,7 @@ typedef struct flow_id_t {
// Force emitting struct flow_id into the ELF. // Force emitting struct flow_id into the ELF.
const struct flow_id_t *unused2 __attribute__((unused)); const struct flow_id_t *unused2 __attribute__((unused));
// Standard 4 tuple and a sequence identifier. // Standard 4 tuple, transport protocol and a sequence identifier.
// No need to emit this struct. It's used only in kernel space // No need to emit this struct. It's used only in kernel space
typedef struct flow_seq_id_t { typedef struct flow_seq_id_t {
u16 src_port; u16 src_port;
...@@ -132,6 +129,8 @@ typedef struct flow_seq_id_t { ...@@ -132,6 +129,8 @@ typedef struct flow_seq_id_t {
u8 src_ip[IP_MAX_LEN]; u8 src_ip[IP_MAX_LEN];
u8 dst_ip[IP_MAX_LEN]; u8 dst_ip[IP_MAX_LEN];
u32 seq_id; u32 seq_id;
u8 transport_protocol;
u32 if_index; // OS interface index
} __attribute__((packed)) flow_seq_id; } __attribute__((packed)) flow_seq_id;
// Flow record is a tuple containing both flow identifier and metrics. It is used to send // Flow record is a tuple containing both flow identifier and metrics. It is used to send
......
...@@ -72,7 +72,7 @@ func main() { ...@@ -72,7 +72,7 @@ func main() {
for records := range receivedRecords { for records := range receivedRecords {
for _, record := range records.Entries { for _, record := range records.Entries {
if record.EthProtocol == ipv6 { if record.EthProtocol == ipv6 {
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt %v\n", log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v\n",
ipProto[record.EthProtocol], ipProto[record.EthProtocol],
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"), record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
record.Interface, record.Interface,
...@@ -91,10 +91,10 @@ func main() { ...@@ -91,10 +91,10 @@ func main() {
record.GetDnsId(), record.GetDnsId(),
record.GetDnsFlags(), record.GetDnsFlags(),
record.DnsLatency.AsDuration().Milliseconds(), record.DnsLatency.AsDuration().Milliseconds(),
record.TimeFlowRtt.AsDuration().Microseconds(), record.TimeFlowRtt.AsDuration().Nanoseconds(),
) )
} else { } else {
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt %v\n", log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v\n",
ipProto[record.EthProtocol], ipProto[record.EthProtocol],
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"), record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
record.Interface, record.Interface,
...@@ -113,7 +113,7 @@ func main() { ...@@ -113,7 +113,7 @@ func main() {
record.GetDnsId(), record.GetDnsId(),
record.GetDnsFlags(), record.GetDnsFlags(),
record.DnsLatency.AsDuration().Milliseconds(), record.DnsLatency.AsDuration().Milliseconds(),
record.TimeFlowRtt.AsDuration().Microseconds(), record.TimeFlowRtt.AsDuration().Nanoseconds(),
) )
} }
} }
......
...@@ -64,11 +64,13 @@ type BpfFlowRecordT struct { ...@@ -64,11 +64,13 @@ type BpfFlowRecordT struct {
} }
type BpfFlowSeqId struct { type BpfFlowSeqId struct {
SrcPort uint16 SrcPort uint16
DstPort uint16 DstPort uint16
SrcIp [16]uint8 SrcIp [16]uint8
DstIp [16]uint8 DstIp [16]uint8
SeqId uint32 SeqId uint32
TransportProtocol uint8
IfIndex uint32
} }
type BpfPktDropsT struct { type BpfPktDropsT struct {
......
No preview for this file type
...@@ -64,11 +64,13 @@ type BpfFlowRecordT struct { ...@@ -64,11 +64,13 @@ type BpfFlowRecordT struct {
} }
type BpfFlowSeqId struct { type BpfFlowSeqId struct {
SrcPort uint16 SrcPort uint16
DstPort uint16 DstPort uint16
SrcIp [16]uint8 SrcIp [16]uint8
DstIp [16]uint8 DstIp [16]uint8
SeqId uint32 SeqId uint32
TransportProtocol uint8
IfIndex uint32
} }
type BpfPktDropsT struct { type BpfPktDropsT struct {
......
No preview for this file type
...@@ -101,6 +101,8 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) { ...@@ -101,6 +101,8 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
if enableRtt == 0 { if enableRtt == 0 {
// Cannot set the size of map to be 0 so set it to 1. // Cannot set the size of map to be 0 so set it to 1.
spec.Maps[flowSequencesMap].MaxEntries = uint32(1) spec.Maps[flowSequencesMap].MaxEntries = uint32(1)
} else {
log.Debugf("RTT calculations are enabled")
} }
if !cfg.DNSTracker { if !cfg.DNSTracker {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment