Skip to content
Snippets Groups Projects
flows.c 11.4 KiB
Newer Older
  • Learn to ignore specific revisions
  •     Flows v2.
        Flow monitor: A Flow-metric generator using TC.
    
    
        This program can be hooked on to TC ingress/egress hook to monitor packets
        to/from an interface.
    
        Logic:
    
            1) Store flow information in a hash map.
            2) Periodically evict the entry from map from userspace.
            3) When the map is full/busy, we send the new flow entry to userspace via ringbuffer,
    
    #include <vmlinux.h>
    #include <bpf_helpers.h>
    #include "configs.h"
    
    /*
     * Defines a packet drops statistics tracker,
     * which attaches at kfree_skb hook. Is optional.
     */
    
    /*
     * Defines a dns tracker,
     * which attaches at net_dev_queue hook. Is optional.
     */
    
    /*
     * Defines an rtt tracker,
     * which runs inside flow_monitor. Is optional.
     */
    
    /*
     * Defines a Packet Capture Agent (PCA) tracker,
     * It is enabled by setting env var ENABLE_PCA= true. Is Optional
     */
    
    /* Do flow filtering. Is optional. */
    #include "flows_filter.h"
    
    /*
     * Defines an Network events monitoring tracker,
     * which runs inside flow_monitor. Is optional.
     */
    #include "network_events_monitoring.h"
    
    /*
     * Defines packets translation tracker
     */
    #include "pkt_translation.h"
    
    // return 0 on success, 1 if capacity reached
    static __always_inline int add_observed_intf(flow_metrics *value, pkt_info *pkt, u32 if_index,
                                                 u8 direction) {
        if (value->nb_observed_intf >= MAX_OBSERVED_INTERFACES) {
            return 1;
        }
        for (u8 i = 0; i < value->nb_observed_intf; i++) {
            if (value->observed_intf[i] == if_index) {
                if (value->observed_direction[i] != direction &&
                    value->observed_direction[i] != OBSERVED_DIRECTION_BOTH) {
                    // Same interface seen on a different direction => mark as both directions
                    value->observed_direction[i] = OBSERVED_DIRECTION_BOTH;
                }
                // Interface already seen -> skip
                return 0;
            }
        }
        value->observed_intf[value->nb_observed_intf] = if_index;
        value->observed_direction[value->nb_observed_intf] = direction;
        value->nb_observed_intf++;
        return 0;
    }
    
    static __always_inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt,
                                                     u64 len, u32 sampling, u32 if_index,
                                                     u8 direction) {
        // Count only packets seen from the same interface as previously to avoid duplicate counts
        int maxReached = 0;
    
        bpf_spin_lock(&aggregate_flow->lock);
    
        if (aggregate_flow->if_index_first_seen == if_index) {
            aggregate_flow->packets += 1;
            aggregate_flow->bytes += len;
            aggregate_flow->end_mono_time_ts = pkt->current_ts;
            aggregate_flow->flags |= pkt->flags;
            aggregate_flow->dscp = pkt->dscp;
            aggregate_flow->sampling = sampling;
        } else if (if_index != 0) {
            // Only add info that we've seen this interface (we can also update end time & flags)
            aggregate_flow->end_mono_time_ts = pkt->current_ts;
            aggregate_flow->flags |= pkt->flags;
            maxReached = add_observed_intf(aggregate_flow, pkt, if_index, direction);
        }
    
        bpf_spin_unlock(&aggregate_flow->lock);
    
        if (maxReached > 0) {
            BPF_PRINTK("observed interface missed (array capacity reached); ifindex=%d, eth_type=%d, "
                       "proto=%d, sport=%d, dport=%d\n",
                       if_index, aggregate_flow->eth_protocol, pkt->id->transport_protocol,
                       pkt->id->src_port, pkt->id->dst_port);
            if (pkt->id->transport_protocol != 0) {
                // Only raise counter on non-zero proto; zero proto traffic is very likely to have its interface max count reached
                increase_counter(OBSERVED_INTF_MISSED);
            }
        }
    
    }
    
    static inline void update_dns(additional_metrics *extra_metrics, pkt_info *pkt, int dns_errno) {
        if (pkt->dns_id != 0) {
    
            extra_metrics->end_mono_time_ts = pkt->current_ts;
    
            extra_metrics->dns_record.id = pkt->dns_id;
            extra_metrics->dns_record.flags = pkt->dns_flags;
            extra_metrics->dns_record.latency = pkt->dns_latency;
        }
        if (dns_errno != 0) {
            extra_metrics->dns_record.errno = dns_errno;
        }
    
    static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
    
        if (!has_filter_sampling) {
            // When no filter sampling is defined, run the sampling check at the earliest for better performances
            // If sampling is defined, will only parse 1 out of "sampling" flows
    
            if (sampling > 1 && (bpf_get_prandom_u32() % sampling) != 0) {
                do_sampling = 0;
                return TC_ACT_OK;
            }
            do_sampling = 1;
        }
    
    
        pkt.current_ts = bpf_ktime_get_ns(); // Record the current time first.
    
        pkt.id = &id;
    
        void *data_end = (void *)(long)skb->data_end;
        void *data = (void *)(long)skb->data;
        struct ethhdr *eth = (struct ethhdr *)data;
    
        u64 len = skb->len;
    
        if (fill_ethhdr(eth, data_end, &pkt, &eth_protocol) == DISCARD) {
    
        // check if this packet need to be filtered if filtering feature is enabled
    
        u32 filter_sampling = 0;
    
    Joel Takvorian's avatar
    Joel Takvorian committed
        bool skip =
            check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, &filter_sampling, direction);
    
        if (has_filter_sampling) {
    
            if (filter_sampling == 0) {
                filter_sampling = sampling;
            }
            // If sampling is defined, will only parse 1 out of "sampling" flows
            if (filter_sampling > 1 && (bpf_get_prandom_u32() % filter_sampling) != 0) {
                do_sampling = 0;
                return TC_ACT_OK;
            }
            do_sampling = 1;
    
        }
        if (skip) {
            return TC_ACT_OK;
    
        int dns_errno = 0;
    
        if (enable_dns_tracking) {
    
            dns_errno = track_dns_packet(skb, &pkt);
    
        flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
    
            update_existing_flow(aggregate_flow, &pkt, len, filter_sampling, skb->ifindex, direction);
    
            // Key does not exist in the map, and will need to create a new entry.
    
            flow_metrics new_flow;
            __builtin_memset(&new_flow, 0, sizeof(new_flow));
    
            new_flow.if_index_first_seen = skb->ifindex;
            new_flow.direction_first_seen = direction;
    
            new_flow.packets = 1;
            new_flow.bytes = len;
            new_flow.eth_protocol = eth_protocol;
            new_flow.start_mono_time_ts = pkt.current_ts;
            new_flow.end_mono_time_ts = pkt.current_ts;
    
            new_flow.flags = pkt.flags;
    
            new_flow.dscp = pkt.dscp;
            new_flow.sampling = filter_sampling;
    
            __builtin_memcpy(new_flow.dst_mac, eth->h_dest, ETH_ALEN);
            __builtin_memcpy(new_flow.src_mac, eth->h_source, ETH_ALEN);
    
            long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_NOEXIST);
    
                if (trace_messages && ret != -EEXIST) {
    
                    bpf_printk("error adding flow %d\n", ret);
                }
    
                if (ret == -EEXIST) {
                    flow_metrics *aggregate_flow =
                        (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
                    if (aggregate_flow != NULL) {
    
                        update_existing_flow(aggregate_flow, &pkt, len, filter_sampling, skb->ifindex,
                                             direction);
    
                    } else {
                        if (trace_messages) {
                            bpf_printk("failed to update an exising flow\n");
                        }
                        // Update global counter for hashmap update errors
                        increase_counter(HASHMAP_FLOWS_DROPPED);
                    }
                } else {
                    // usually error -16 (-EBUSY) or -7 (E2BIG) is printed here.
                    // In this case, we send the single-packet flow via ringbuffer as in the worst case we can have
                    // a repeated INTERSECTION of flows (different flows aggregating different packets),
                    // which can be re-aggregated at userpace.
                    // other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
                    new_flow.errno = -ret;
                    flow_record *record =
                        (flow_record *)bpf_ringbuf_reserve(&direct_flows, sizeof(flow_record), 0);
                    if (!record) {
                        if (trace_messages) {
                            bpf_printk("couldn't reserve space in the ringbuf. Dropping flow");
                        }
                        return TC_ACT_OK;
    
                    record->id = id;
                    record->metrics = new_flow;
                    bpf_ringbuf_submit(record, 0);
    
    
        // Update additional metrics (per-CPU map)
        if (pkt.dns_id != 0 || dns_errno != 0) {
            additional_metrics *extra_metrics =
                (additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id);
            if (extra_metrics != NULL) {
                update_dns(extra_metrics, &pkt, dns_errno);
            } else {
    
                additional_metrics new_metrics;
                __builtin_memset(&new_metrics, 0, sizeof(new_metrics));
                new_metrics.start_mono_time_ts = pkt.current_ts;
                new_metrics.end_mono_time_ts = pkt.current_ts;
                new_metrics.eth_protocol = eth_protocol;
                new_metrics.dns_record.id = pkt.dns_id;
                new_metrics.dns_record.flags = pkt.dns_flags;
                new_metrics.dns_record.latency = pkt.dns_latency;
                new_metrics.dns_record.errno = dns_errno;
    
                long ret =
                    bpf_map_update_elem(&additional_flow_metrics, &id, &new_metrics, BPF_NOEXIST);
                if (ret != 0) {
                    if (trace_messages && ret != -EEXIST) {
                        bpf_printk("error adding DNS %d\n", ret);
                    }
                    if (ret == -EEXIST) {
                        // Concurrent write from another CPU; retry
                        additional_metrics *extra_metrics =
                            (additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id);
                        if (extra_metrics != NULL) {
                            update_dns(extra_metrics, &pkt, dns_errno);
                        } else {
                            if (trace_messages) {
                                bpf_printk("failed to update DNS\n");
                            }
                            increase_counter(HASHMAP_FAIL_UPDATE_DNS);
                        }
                    } else {
                        increase_counter(HASHMAP_FAIL_UPDATE_DNS);
                    }
                }
            }
        }
    
    
        return TC_ACT_OK;
    
    int tc_ingress_flow_parse(struct __sk_buff *skb) {
    
        return flow_monitor(skb, INGRESS);
    
    int tc_egress_flow_parse(struct __sk_buff *skb) {
    
        return flow_monitor(skb, EGRESS);
    }
    
    int tcx_ingress_flow_parse(struct __sk_buff *skb) {
        flow_monitor(skb, INGRESS);
        // return TCX_NEXT to allow existing with other TCX hooks
        return TCX_NEXT;
    }
    
    
    int tcx_egress_flow_parse(struct __sk_buff *skb) {
        flow_monitor(skb, EGRESS);
        // return TCX_NEXT to allow existing with other TCX hooks
        return TCX_NEXT;
    }
    
    
    char _license[] SEC("license") = "GPL";