diff --git a/bpf/configs.h b/bpf/configs.h index cdd9d58dcd7f73a62635bec4c8a5e49aae5b0bb2..fcf55db558282f829d2183f9fcab69bc008a31e2 100644 --- a/bpf/configs.h +++ b/bpf/configs.h @@ -8,5 +8,5 @@ volatile const u8 trace_messages = 0; volatile const u8 enable_rtt = 0; volatile const u16 pca_port = 0; volatile const u8 pca_proto = 0; - +volatile const u8 enable_dns_tracking = 0; #endif //__CONFIGS_H__ diff --git a/bpf/dns_tracker.h b/bpf/dns_tracker.h index 48506eba82a4701b4f0a72a9b311795249173d63..5715269583793937add41fbc5e673b55d25b52a0 100644 --- a/bpf/dns_tracker.h +++ b/bpf/dns_tracker.h @@ -1,5 +1,5 @@ /* - light weight DNS tracker using trace points. + light weight DNS tracker. */ #ifndef __DNS_TRACKER_H__ @@ -19,27 +19,6 @@ struct dns_header { u16 arcount; }; -static inline void find_or_create_dns_flow(flow_id *id, struct dns_header *dns, int len, u16 flags, u64 latency) { - flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, id); - u64 current_time = bpf_ktime_get_ns(); - // net_dev_queue trace point hook will run before TC hooks, so the flow shouldn't exists, if it does - // that indicates we have a stale DNS query/response or in the middle of TCP flow so we will do nothing - if (aggregate_flow == NULL) { - // there is no matching flows so lets create new one and dns info - flow_metrics new_flow; - __builtin_memset(&new_flow, 0, sizeof(new_flow)); - new_flow.start_mono_time_ts = current_time; - new_flow.end_mono_time_ts = current_time; - new_flow.packets = 1; - new_flow.bytes = len; - new_flow.flags = flags; - new_flow.dns_record.id = bpf_ntohs(dns->id); - new_flow.dns_record.flags = bpf_ntohs(dns->flags); - new_flow.dns_record.latency = latency; - bpf_map_update_elem(&aggregated_flows, id, &new_flow, BPF_ANY); - } -} - static inline void fill_dns_id (flow_id *id, dns_flow_id *dns_flow, u16 dns_id, bool reverse) { dns_flow->id = dns_id; dns_flow->protocol = id->transport_protocol; @@ -56,73 +35,72 @@ static inline void fill_dns_id (flow_id *id, dns_flow_id *dns_flow, u16 dns_id, } } -static inline int trace_dns(struct sk_buff *skb) { - flow_id id; - u8 protocol = 0; - u16 family = 0,flags = 0, len = 0; - - __builtin_memset(&id, 0, sizeof(id)); +static __always_inline u8 calc_dns_header_offset(pkt_info *pkt, void *data_end) { + u8 len = 0; + switch (pkt->id->transport_protocol) { + case IPPROTO_TCP: { + struct tcphdr *tcp = (struct tcphdr *) pkt->l4_hdr; + if (!tcp || ((void *)tcp + sizeof(*tcp) > data_end)) { + return 0; + } + len = tcp->doff * sizeof(u32) + 2; // DNS over TCP has 2 bytes of length at the beginning + break; + } + case IPPROTO_UDP: { + struct udphdr *udp = (struct udphdr *) pkt->l4_hdr; + if (!udp || ((void *)udp + sizeof(*udp) > data_end)) { + return 0; + } + len = bpf_ntohs(udp->len); + // make sure udp payload doesn't exceed max msg size + if (len - sizeof(struct udphdr) > UDP_MAXMSG) { + return 0; + } + // set the length to udp hdr size as it will be used to locate dns header + len = sizeof(struct udphdr); + break; + } + } + return len; +} - id.if_index = skb->skb_iif; +static __always_inline void track_dns_packet(struct __sk_buff *skb, pkt_info *pkt) { + void *data_end = (void *)(long)skb->data_end; + if (pkt->id->dst_port == DNS_PORT || pkt->id->src_port == DNS_PORT) { + dns_flow_id dns_req; - // read L2 info - set_key_with_l2_info(skb, &id, &family); + u8 len = calc_dns_header_offset(pkt, data_end); + if (!len) { + return; + } - // read L3 info - set_key_with_l3_info(skb, family, &id, &protocol); + struct dns_header dns; + u32 dns_offset = (long)pkt->l4_hdr - (long)skb->data + len; - switch (protocol) { - case IPPROTO_UDP: - len = set_key_with_udp_info(skb, &id, IPPROTO_UDP); - // make sure udp payload doesn't exceed max msg size - if (len - sizeof(struct udphdr) > UDP_MAXMSG) { - return -1; + if (bpf_skb_load_bytes(skb, dns_offset, &dns, sizeof(dns)) < 0) { + return; } - // set the length to udp hdr size as it will be used below to locate dns header - len = sizeof(struct udphdr); - break; - case IPPROTO_TCP: - len = set_key_with_tcp_info(skb, &id, IPPROTO_TCP, &flags); - break; - default: - return -1; - } - // check for DNS packets - if (id.dst_port == DNS_PORT || id.src_port == DNS_PORT) { - struct dns_header dns; - dns_flow_id dns_req; - u64 latency = 0; - bpf_probe_read(&dns, sizeof(dns), (struct dns_header *)(skb->head + skb->transport_header + len)); - if ((bpf_ntohs(dns.flags) & DNS_QR_FLAG) == 0) { /* dns query */ - fill_dns_id(&id, &dns_req, bpf_ntohs(dns.id), false); + u16 dns_id = bpf_ntohs(dns.id); + u16 flags = bpf_ntohs(dns.flags); + u64 ts = bpf_ktime_get_ns(); + + if ((flags & DNS_QR_FLAG) == 0) { /* dns query */ + fill_dns_id(pkt->id, &dns_req, dns_id, false); if (bpf_map_lookup_elem(&dns_flows, &dns_req) == NULL) { - u64 ts = bpf_ktime_get_ns(); bpf_map_update_elem(&dns_flows, &dns_req, &ts, BPF_ANY); } - id.direction = EGRESS; } else { /* dns response */ - id.direction = INGRESS; - fill_dns_id(&id, &dns_req, bpf_ntohs(dns.id), true); + fill_dns_id(pkt->id, &dns_req, dns_id, true); u64 *value = bpf_map_lookup_elem(&dns_flows, &dns_req); if (value != NULL) { - latency = bpf_ktime_get_ns() - *value; + pkt->dns_latency = ts - *value; + pkt->dns_id = dns_id; + pkt->dns_flags = flags; bpf_map_delete_elem(&dns_flows, &dns_req); - find_or_create_dns_flow(&id, &dns, skb->len, flags, latency); } } // end of dns response } // end of dns port check - - return 0; -} - -SEC("tracepoint/net/net_dev_queue") -int trace_net_packets(struct trace_event_raw_net_dev_template *args) { - struct sk_buff skb; - - __builtin_memset(&skb, 0, sizeof(skb)); - bpf_probe_read(&skb, sizeof(struct sk_buff), args->skbaddr); - return trace_dns(&skb); } #endif // __DNS_TRACKER_H__ diff --git a/bpf/flows.c b/bpf/flows.c index e51e60a8a64391680cc51d156b606d658936ea63..219660e646f4c2d974e1456474704c3833abd05b 100644 --- a/bpf/flows.c +++ b/bpf/flows.c @@ -73,6 +73,9 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { calculate_flow_rtt(&pkt, direction, data_end); } + if (enable_dns_tracking) { + track_dns_packet(skb, &pkt); + } // TODO: we need to add spinlock here when we deprecate versions prior to 5.1, or provide // a spinlocked alternative version and use it selectively https://lwn.net/Articles/779120/ flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id); @@ -91,7 +94,9 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { if (pkt.rtt > aggregate_flow->flow_rtt) { aggregate_flow->flow_rtt = pkt.rtt; } - + aggregate_flow->dns_record.id = pkt.dns_id; + aggregate_flow->dns_record.flags = pkt.dns_flags; + aggregate_flow->dns_record.latency = pkt.dns_latency; long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY); if (trace_messages && ret != 0) { // usually error -16 (-EBUSY) is printed here. @@ -111,6 +116,9 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { .flags = pkt.flags, .flow_rtt = pkt.rtt, .dscp = pkt.dscp, + .dns_record.id = pkt.dns_id, + .dns_record.flags = pkt.dns_flags, + .dns_record.latency = pkt.dns_latency, }; // even if we know that the entry is new, another CPU might be concurrently inserting a flow diff --git a/bpf/types.h b/bpf/types.h index 6084b14906ad5a57ad783f4da6a533e0e6c4a3b5..f9790427be578701dcbd0f5ad942eb094fc83cd1 100644 --- a/bpf/types.h +++ b/bpf/types.h @@ -159,6 +159,9 @@ typedef struct pkt_info_t { void *l4_hdr; // Stores the actual l4 header u64 rtt; // rtt calculated from the flow if possible. else zero u8 dscp; // IPv4/6 DSCP value + u16 dns_id; + u16 dns_flags; + u64 dns_latency; } pkt_info; // Structure for payload metadata diff --git a/pkg/ebpf/bpf_bpfeb.go b/pkg/ebpf/bpf_bpfeb.go index e95e7b9978de2848a370d47c8e31b2b87370fa97..bb10a159179776cc442fbc4a294b3aad8dc84688 100644 --- a/pkg/ebpf/bpf_bpfeb.go +++ b/pkg/ebpf/bpf_bpfeb.go @@ -128,7 +128,6 @@ type BpfProgramSpecs struct { IngressFlowParse *ebpf.ProgramSpec `ebpf:"ingress_flow_parse"` IngressPcaParse *ebpf.ProgramSpec `ebpf:"ingress_pca_parse"` KfreeSkb *ebpf.ProgramSpec `ebpf:"kfree_skb"` - TraceNetPackets *ebpf.ProgramSpec `ebpf:"trace_net_packets"` } // BpfMapSpecs contains maps before they are loaded into the kernel. @@ -187,7 +186,6 @@ type BpfPrograms struct { IngressFlowParse *ebpf.Program `ebpf:"ingress_flow_parse"` IngressPcaParse *ebpf.Program `ebpf:"ingress_pca_parse"` KfreeSkb *ebpf.Program `ebpf:"kfree_skb"` - TraceNetPackets *ebpf.Program `ebpf:"trace_net_packets"` } func (p *BpfPrograms) Close() error { @@ -197,7 +195,6 @@ func (p *BpfPrograms) Close() error { p.IngressFlowParse, p.IngressPcaParse, p.KfreeSkb, - p.TraceNetPackets, ) } diff --git a/pkg/ebpf/bpf_bpfeb.o b/pkg/ebpf/bpf_bpfeb.o index 670b6687a16def2f11d22b583ded9e4e3b001cfb..12b4001b8b2cf7624ef8cbbdfb3d4375a891a48f 100644 Binary files a/pkg/ebpf/bpf_bpfeb.o and b/pkg/ebpf/bpf_bpfeb.o differ diff --git a/pkg/ebpf/bpf_bpfel.go b/pkg/ebpf/bpf_bpfel.go index 4a84d59b4b399889b052d9ec46f5d88c07265b58..8556d19a5a1c3245c922e45cae829831f1c31789 100644 --- a/pkg/ebpf/bpf_bpfel.go +++ b/pkg/ebpf/bpf_bpfel.go @@ -128,7 +128,6 @@ type BpfProgramSpecs struct { IngressFlowParse *ebpf.ProgramSpec `ebpf:"ingress_flow_parse"` IngressPcaParse *ebpf.ProgramSpec `ebpf:"ingress_pca_parse"` KfreeSkb *ebpf.ProgramSpec `ebpf:"kfree_skb"` - TraceNetPackets *ebpf.ProgramSpec `ebpf:"trace_net_packets"` } // BpfMapSpecs contains maps before they are loaded into the kernel. @@ -187,7 +186,6 @@ type BpfPrograms struct { IngressFlowParse *ebpf.Program `ebpf:"ingress_flow_parse"` IngressPcaParse *ebpf.Program `ebpf:"ingress_pca_parse"` KfreeSkb *ebpf.Program `ebpf:"kfree_skb"` - TraceNetPackets *ebpf.Program `ebpf:"trace_net_packets"` } func (p *BpfPrograms) Close() error { @@ -197,7 +195,6 @@ func (p *BpfPrograms) Close() error { p.IngressFlowParse, p.IngressPcaParse, p.KfreeSkb, - p.TraceNetPackets, ) } diff --git a/pkg/ebpf/bpf_bpfel.o b/pkg/ebpf/bpf_bpfel.o index cb818ed55cfe5d7da5c9a194d6d6a762e78a0523..5e001e1de72aa19a7028827941ae9dc6cfda9b3b 100644 Binary files a/pkg/ebpf/bpf_bpfel.o and b/pkg/ebpf/bpf_bpfel.o differ diff --git a/pkg/ebpf/tracer.go b/pkg/ebpf/tracer.go index 57631b29b7bd8010b9d1e7bdceee64df78393d20..cf213ce6e9bb183aa55398ff344d45ab01dcdab8 100644 --- a/pkg/ebpf/tracer.go +++ b/pkg/ebpf/tracer.go @@ -35,14 +35,14 @@ const ( flowSequencesMap = "flow_sequences" dnsLatencyMap = "dns_flows" // constants defined in flows.c as "volatile const" - constSampling = "sampling" - constTraceMessages = "trace_messages" - constEnableRtt = "enable_rtt" - pktDropHook = "kfree_skb" - dnsTraceHook = "net_dev_queue" - constPcaPort = "pca_port" - constPcaProto = "pca_proto" - pcaRecordsMap = "packet_record" + constSampling = "sampling" + constTraceMessages = "trace_messages" + constEnableRtt = "enable_rtt" + constEnableDNSTracking = "enable_dns_tracking" + pktDropHook = "kfree_skb" + constPcaPort = "pca_port" + constPcaProto = "pca_proto" + pcaRecordsMap = "packet_record" ) var log = logrus.WithField("component", "ebpf.FlowFetcher") @@ -53,16 +53,15 @@ var plog = logrus.WithField("component", "ebpf.PacketFetcher") // and to flows that are forwarded by the kernel via ringbuffer because could not be aggregated // in the map type FlowFetcher struct { - objects *BpfObjects - qdiscs map[ifaces.Interface]*netlink.GenericQdisc - egressFilters map[ifaces.Interface]*netlink.BpfFilter - ingressFilters map[ifaces.Interface]*netlink.BpfFilter - ringbufReader *ringbuf.Reader - cacheMaxSize int - enableIngress bool - enableEgress bool - pktDropsTracePoint link.Link - dnsTrackerTracePoint link.Link + objects *BpfObjects + qdiscs map[ifaces.Interface]*netlink.GenericQdisc + egressFilters map[ifaces.Interface]*netlink.BpfFilter + ingressFilters map[ifaces.Interface]*netlink.BpfFilter + ringbufReader *ringbuf.Reader + cacheMaxSize int + enableIngress bool + enableEgress bool + pktDropsTracePoint link.Link } type FlowFetcherConfig struct { @@ -113,14 +112,20 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) { log.Debugf("RTT calculations are enabled") } - if !cfg.DNSTracker { + enableDNSTracking := 0 + if cfg.DNSTracker { + enableDNSTracking = 1 + } + + if enableDNSTracking == 0 { spec.Maps[dnsLatencyMap].MaxEntries = 1 } if err := spec.RewriteConstants(map[string]interface{}{ - constSampling: uint32(cfg.Sampling), - constTraceMessages: uint8(traceMsgs), - constEnableRtt: uint8(enableRtt), + constSampling: uint32(cfg.Sampling), + constTraceMessages: uint8(traceMsgs), + constEnableRtt: uint8(enableRtt), + constEnableDNSTracking: uint8(enableDNSTracking), }); err != nil { return nil, fmt.Errorf("rewriting BPF constants definition: %w", err) } @@ -149,30 +154,21 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) { } } - var dnsTrackerLink link.Link - if cfg.DNSTracker { - dnsTrackerLink, err = link.Tracepoint("net", dnsTraceHook, objects.TraceNetPackets, nil) - if err != nil { - return nil, fmt.Errorf("failed to attach the BPF program to trace_net_packets: %w", err) - } - } - // read events from igress+egress ringbuffer flows, err := ringbuf.NewReader(objects.DirectFlows) if err != nil { return nil, fmt.Errorf("accessing to ringbuffer: %w", err) } return &FlowFetcher{ - objects: &objects, - ringbufReader: flows, - egressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, - ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, - qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{}, - cacheMaxSize: cfg.CacheMaxSize, - enableIngress: cfg.EnableIngress, - enableEgress: cfg.EnableEgress, - pktDropsTracePoint: pktDropsLink, - dnsTrackerTracePoint: dnsTrackerLink, + objects: &objects, + ringbufReader: flows, + egressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, + ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, + qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{}, + cacheMaxSize: cfg.CacheMaxSize, + enableIngress: cfg.EnableIngress, + enableEgress: cfg.EnableEgress, + pktDropsTracePoint: pktDropsLink, }, nil } @@ -301,11 +297,6 @@ func (m *FlowFetcher) Close() error { errs = append(errs, err) } } - if m.dnsTrackerTracePoint != nil { - if err := m.dnsTrackerTracePoint.Close(); err != nil { - errs = append(errs, err) - } - } // m.ringbufReader.Read is a blocking operation, so we need to close the ring buffer // from another goroutine to avoid the system not being able to exit if there // isn't traffic in a given interface @@ -491,7 +482,6 @@ func kernelSpecificLoadAndAssign(oldKernel bool, spec *ebpf.CollectionSpec) (Bpf type NewBpfPrograms struct { EgressFlowParse *ebpf.Program `ebpf:"egress_flow_parse"` IngressFlowParse *ebpf.Program `ebpf:"ingress_flow_parse"` - TraceNetPackets *ebpf.Program `ebpf:"trace_net_packets"` } type NewBpfObjects struct { NewBpfPrograms @@ -517,7 +507,6 @@ func kernelSpecificLoadAndAssign(oldKernel bool, spec *ebpf.CollectionSpec) (Bpf objects.FlowSequences = newObjects.FlowSequences objects.EgressFlowParse = newObjects.EgressFlowParse objects.IngressFlowParse = newObjects.IngressFlowParse - objects.TraceNetPackets = newObjects.TraceNetPackets objects.KfreeSkb = nil } else { if err := spec.LoadAndAssign(&objects, nil); err != nil { @@ -574,13 +563,12 @@ func NewPacketFetcher( objects.DirectFlows = nil objects.AggregatedFlows = nil objects.FlowSequences = nil - objects.TraceNetPackets = nil delete(spec.Programs, aggregatedFlowsMap) delete(spec.Programs, flowSequencesMap) delete(spec.Programs, constSampling) delete(spec.Programs, constTraceMessages) delete(spec.Programs, constEnableRtt) - delete(spec.Programs, dnsTraceHook) + delete(spec.Programs, constEnableDNSTracking) pcaPort := 0 pcaProto := 0