-
Mohamed S. Mahmoud authored
This reverts commit b6e2b873. fix Signed-off-by:
msherif1234 <mmahmoud@redhat.com>
Mohamed S. Mahmoud authoredThis reverts commit b6e2b873. fix Signed-off-by:
msherif1234 <mmahmoud@redhat.com>
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
flows.c 5.55 KiB
/*
Flows v2.
Flow monitor: A Flow-metric generator using TC.
This program can be hooked on to TC ingress/egress hook to monitor packets
to/from an interface.
Logic:
1) Store flow information in a per-cpu hash map.
2) Upon flow completion (tcp->fin event), evict the entry from map, and
send to userspace through ringbuffer.
Eviction for non-tcp flows need to done by userspace
3) When the map is full, we send the new flow entry to userspace via ringbuffer,
until an entry is available.
4) When hash collision is detected, we send the new entry to userpace via ringbuffer.
*/
#include <vmlinux.h>
#include <bpf_helpers.h>
#include "configs.h"
#include "utils.h"
/* Defines a packet drops statistics tracker,
which attaches at kfree_skb hook. Is optional.
*/
#include "pkt_drops.h"
/* Defines a dns tracker,
which attaches at net_dev_queue hook. Is optional.
*/
#include "dns_tracker.h"
/* Defines an rtt tracker,
which runs inside flow_monitor. Is optional.
*/
#include "rtt_tracker.h"
static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
// If sampling is defined, will only parse 1 out of "sampling" flows
if (sampling != 0 && (bpf_get_prandom_u32() % sampling) != 0) {
return TC_ACT_OK;
}
pkt_info pkt;
__builtin_memset(&pkt, 0, sizeof(pkt));
flow_id id;
__builtin_memset(&id, 0, sizeof(id));
pkt.current_ts = bpf_ktime_get_ns(); // Record the current time first.
pkt.id = &id;
void *data_end = (void *)(long)skb->data_end;
void *data = (void *)(long)skb->data;
struct ethhdr *eth = (struct ethhdr *)data;
if (fill_ethhdr(eth, data_end, &pkt) == DISCARD) {
return TC_ACT_OK;
}
//Set extra fields
id.if_index = skb->ifindex;
id.direction = direction;
// We calculate the RTT before looking up aggregated_flows map because we want
// to keep the critical section between map lookup and update consume minimum time.
if (enable_rtt) {
// This is currently not to be enabled by default.
calculate_flow_rtt(&pkt, direction, data_end);
}
// TODO: we need to add spinlock here when we deprecate versions prior to 5.1, or provide
// a spinlocked alternative version and use it selectively https://lwn.net/Articles/779120/
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
aggregate_flow->packets += 1;
aggregate_flow->bytes += skb->len;
aggregate_flow->end_mono_time_ts = pkt.current_ts;
// it might happen that start_mono_time hasn't been set due to
// the way percpu hashmap deal with concurrent map entries
if (aggregate_flow->start_mono_time_ts == 0) {
aggregate_flow->start_mono_time_ts = pkt.current_ts;
}
aggregate_flow->flags |= pkt.flags;
// Does not matter the gate. Will be zero if not enabled.
if (pkt.rtt > aggregate_flow->flow_rtt) {
aggregate_flow->flow_rtt = pkt.rtt;
}
long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY);
if (trace_messages && ret != 0) {
// usually error -16 (-EBUSY) is printed here.
// In this case, the flow is dropped, as submitting it to the ringbuffer would cause
// a duplicated UNION of flows (two different flows with partial aggregation of the same packets),
// which can't be deduplicated.
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
bpf_printk("error updating flow %d\n", ret);
}
} else {
// Key does not exist in the map, and will need to create a new entry.
flow_metrics new_flow = {
.packets = 1,
.bytes = skb->len,
.start_mono_time_ts = pkt.current_ts,
.end_mono_time_ts = pkt.current_ts,
.flags = pkt.flags,
.flow_rtt = pkt.rtt
};
// even if we know that the entry is new, another CPU might be concurrently inserting a flow
// so we need to specify BPF_ANY
long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY);
if (ret != 0) {
// usually error -16 (-EBUSY) or -7 (E2BIG) is printed here.
// In this case, we send the single-packet flow via ringbuffer as in the worst case we can have
// a repeated INTERSECTION of flows (different flows aggregating different packets),
// which can be re-aggregated at userpace.
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
if (trace_messages) {
bpf_printk("error adding flow %d\n", ret);
}
new_flow.errno = -ret;
flow_record *record = (flow_record *)bpf_ringbuf_reserve(&direct_flows, sizeof(flow_record), 0);
if (!record) {
if (trace_messages) {
bpf_printk("couldn't reserve space in the ringbuf. Dropping flow");
}
return TC_ACT_OK;
}
record->id = id;
record->metrics = new_flow;
bpf_ringbuf_submit(record, 0);
}
}
return TC_ACT_OK;
}
SEC("tc_ingress")
int ingress_flow_parse(struct __sk_buff *skb) {
return flow_monitor(skb, INGRESS);
}
SEC("tc_egress")
int egress_flow_parse(struct __sk_buff *skb) {
return flow_monitor(skb, EGRESS);
}
char _license[] SEC("license") = "GPL";