Newer
Older
Flows v2.
Flow monitor: A Flow-metric generator using TC.
This program can be hooked on to TC ingress/egress hook to monitor packets
to/from an interface.
Logic:
1) Store flow information in a per-cpu hash map.
2) Upon flow completion (tcp->fin event), evict the entry from map, and
send to userspace through ringbuffer.
Eviction for non-tcp flows need to done by userspace
3) When the map is full, we send the new flow entry to userspace via ringbuffer,
until an entry is available.
4) When hash collision is detected, we send the new entry to userpace via ringbuffer.
*/
#include <vmlinux.h>
#include <bpf_helpers.h>
#include "configs.h"
#include "utils.h"
/* Defines a packet drops statistics tracker,
which attaches at kfree_skb hook. Is optional.
*/
#include "pkt_drops.h"
/* Defines a dns tracker,
which attaches at net_dev_queue hook. Is optional.
*/
#include "dns_tracker.h"
/* Defines an rtt tracker,
which runs inside flow_monitor. Is optional.
*/
#include "rtt_tracker.h"
static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
// If sampling is defined, will only parse 1 out of "sampling" flows
if (sampling != 0 && (bpf_get_prandom_u32() % sampling) != 0) {
return TC_ACT_OK;
}
Dushyant Behl
committed
pkt_info pkt;
__builtin_memset(&pkt, 0, sizeof(pkt));
flow_id id;
Mohamed S. Mahmoud
committed
__builtin_memset(&id, 0, sizeof(id));
Dushyant Behl
committed
pkt.current_ts = bpf_ktime_get_ns(); // Record the current time first.
pkt.id = &id;
void *data_end = (void *)(long)skb->data_end;
void *data = (void *)(long)skb->data;
struct ethhdr *eth = (struct ethhdr *)data;
if (fill_ethhdr(eth, data_end, &pkt) == DISCARD) {
return TC_ACT_OK;
//Set extra fields
id.if_index = skb->ifindex;
id.direction = direction;
Dushyant Behl
committed
// We calculate the RTT before looking up aggregated_flows map because we want
// to keep the critical section between map lookup and update consume minimum time.
if (enable_rtt) {
// This is currently not to be enabled by default.
calculate_flow_rtt(&pkt, direction, data_end);
}
// TODO: we need to add spinlock here when we deprecate versions prior to 5.1, or provide
// a spinlocked alternative version and use it selectively https://lwn.net/Articles/779120/
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
aggregate_flow->packets += 1;
aggregate_flow->bytes += skb->len;
Dushyant Behl
committed
aggregate_flow->end_mono_time_ts = pkt.current_ts;
Mohamed S. Mahmoud
committed
// it might happen that start_mono_time hasn't been set due to
// the way percpu hashmap deal with concurrent map entries
if (aggregate_flow->start_mono_time_ts == 0) {
aggregate_flow->start_mono_time_ts = pkt.current_ts;
}
aggregate_flow->flags |= pkt.flags;
// Does not matter the gate. Will be zero if not enabled.
Dushyant Behl
committed
if (pkt.rtt > aggregate_flow->flow_rtt) {
aggregate_flow->flow_rtt = pkt.rtt;
}
long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY);
if (trace_messages && ret != 0) {
// usually error -16 (-EBUSY) is printed here.
// In this case, the flow is dropped, as submitting it to the ringbuffer would cause
// a duplicated UNION of flows (two different flows with partial aggregation of the same packets),
// which can't be deduplicated.
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
bpf_printk("error updating flow %d\n", ret);
}
} else {
// Key does not exist in the map, and will need to create a new entry.
flow_metrics new_flow = {
.packets = 1,
Dushyant Behl
committed
.start_mono_time_ts = pkt.current_ts,
.end_mono_time_ts = pkt.current_ts,
.flags = pkt.flags,
.flow_rtt = pkt.rtt
// even if we know that the entry is new, another CPU might be concurrently inserting a flow
// so we need to specify BPF_ANY
long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY);
if (ret != 0) {
// usually error -16 (-EBUSY) or -7 (E2BIG) is printed here.
// In this case, we send the single-packet flow via ringbuffer as in the worst case we can have
// a repeated INTERSECTION of flows (different flows aggregating different packets),
// which can be re-aggregated at userpace.
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
if (trace_messages) {
bpf_printk("error adding flow %d\n", ret);
}
new_flow.errno = -ret;
flow_record *record = (flow_record *)bpf_ringbuf_reserve(&direct_flows, sizeof(flow_record), 0);
if (!record) {
if (trace_messages) {
bpf_printk("couldn't reserve space in the ringbuf. Dropping flow");
}
return TC_ACT_OK;
}
record->id = id;
record->metrics = new_flow;
bpf_ringbuf_submit(record, 0);
}
SEC("tc_ingress")
int ingress_flow_parse(struct __sk_buff *skb) {
return flow_monitor(skb, INGRESS);
}
SEC("tc_egress")
int egress_flow_parse(struct __sk_buff *skb) {
return flow_monitor(skb, EGRESS);
}
char _license[] SEC("license") = "GPL";