Skip to content
Snippets Groups Projects
Unverified Commit 0891b34d authored by Mohamed S. Mahmoud's avatar Mohamed S. Mahmoud Committed by GitHub
Browse files

NETOBSERV-1245: fix TCP DNS query (#206)


Signed-off-by: default avatarmsherif1234 <mmahmoud@redhat.com>
parent f1e5f34e
Branches
Tags
No related merge requests found
......@@ -8,5 +8,5 @@ volatile const u8 trace_messages = 0;
volatile const u8 enable_rtt = 0;
volatile const u16 pca_port = 0;
volatile const u8 pca_proto = 0;
volatile const u8 enable_dns_tracking = 0;
#endif //__CONFIGS_H__
/*
light weight DNS tracker using trace points.
light weight DNS tracker.
*/
#ifndef __DNS_TRACKER_H__
......@@ -19,27 +19,6 @@ struct dns_header {
u16 arcount;
};
static inline void find_or_create_dns_flow(flow_id *id, struct dns_header *dns, int len, u16 flags, u64 latency) {
flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, id);
u64 current_time = bpf_ktime_get_ns();
// net_dev_queue trace point hook will run before TC hooks, so the flow shouldn't exists, if it does
// that indicates we have a stale DNS query/response or in the middle of TCP flow so we will do nothing
if (aggregate_flow == NULL) {
// there is no matching flows so lets create new one and dns info
flow_metrics new_flow;
__builtin_memset(&new_flow, 0, sizeof(new_flow));
new_flow.start_mono_time_ts = current_time;
new_flow.end_mono_time_ts = current_time;
new_flow.packets = 1;
new_flow.bytes = len;
new_flow.flags = flags;
new_flow.dns_record.id = bpf_ntohs(dns->id);
new_flow.dns_record.flags = bpf_ntohs(dns->flags);
new_flow.dns_record.latency = latency;
bpf_map_update_elem(&aggregated_flows, id, &new_flow, BPF_ANY);
}
}
static inline void fill_dns_id (flow_id *id, dns_flow_id *dns_flow, u16 dns_id, bool reverse) {
dns_flow->id = dns_id;
dns_flow->protocol = id->transport_protocol;
......@@ -56,73 +35,72 @@ static inline void fill_dns_id (flow_id *id, dns_flow_id *dns_flow, u16 dns_id,
}
}
static inline int trace_dns(struct sk_buff *skb) {
flow_id id;
u8 protocol = 0;
u16 family = 0,flags = 0, len = 0;
__builtin_memset(&id, 0, sizeof(id));
static __always_inline u8 calc_dns_header_offset(pkt_info *pkt, void *data_end) {
u8 len = 0;
switch (pkt->id->transport_protocol) {
case IPPROTO_TCP: {
struct tcphdr *tcp = (struct tcphdr *) pkt->l4_hdr;
if (!tcp || ((void *)tcp + sizeof(*tcp) > data_end)) {
return 0;
}
len = tcp->doff * sizeof(u32) + 2; // DNS over TCP has 2 bytes of length at the beginning
break;
}
case IPPROTO_UDP: {
struct udphdr *udp = (struct udphdr *) pkt->l4_hdr;
if (!udp || ((void *)udp + sizeof(*udp) > data_end)) {
return 0;
}
len = bpf_ntohs(udp->len);
// make sure udp payload doesn't exceed max msg size
if (len - sizeof(struct udphdr) > UDP_MAXMSG) {
return 0;
}
// set the length to udp hdr size as it will be used to locate dns header
len = sizeof(struct udphdr);
break;
}
}
return len;
}
id.if_index = skb->skb_iif;
static __always_inline void track_dns_packet(struct __sk_buff *skb, pkt_info *pkt) {
void *data_end = (void *)(long)skb->data_end;
if (pkt->id->dst_port == DNS_PORT || pkt->id->src_port == DNS_PORT) {
dns_flow_id dns_req;
// read L2 info
set_key_with_l2_info(skb, &id, &family);
u8 len = calc_dns_header_offset(pkt, data_end);
if (!len) {
return;
}
// read L3 info
set_key_with_l3_info(skb, family, &id, &protocol);
struct dns_header dns;
u32 dns_offset = (long)pkt->l4_hdr - (long)skb->data + len;
switch (protocol) {
case IPPROTO_UDP:
len = set_key_with_udp_info(skb, &id, IPPROTO_UDP);
// make sure udp payload doesn't exceed max msg size
if (len - sizeof(struct udphdr) > UDP_MAXMSG) {
return -1;
if (bpf_skb_load_bytes(skb, dns_offset, &dns, sizeof(dns)) < 0) {
return;
}
// set the length to udp hdr size as it will be used below to locate dns header
len = sizeof(struct udphdr);
break;
case IPPROTO_TCP:
len = set_key_with_tcp_info(skb, &id, IPPROTO_TCP, &flags);
break;
default:
return -1;
}
// check for DNS packets
if (id.dst_port == DNS_PORT || id.src_port == DNS_PORT) {
struct dns_header dns;
dns_flow_id dns_req;
u64 latency = 0;
bpf_probe_read(&dns, sizeof(dns), (struct dns_header *)(skb->head + skb->transport_header + len));
if ((bpf_ntohs(dns.flags) & DNS_QR_FLAG) == 0) { /* dns query */
fill_dns_id(&id, &dns_req, bpf_ntohs(dns.id), false);
u16 dns_id = bpf_ntohs(dns.id);
u16 flags = bpf_ntohs(dns.flags);
u64 ts = bpf_ktime_get_ns();
if ((flags & DNS_QR_FLAG) == 0) { /* dns query */
fill_dns_id(pkt->id, &dns_req, dns_id, false);
if (bpf_map_lookup_elem(&dns_flows, &dns_req) == NULL) {
u64 ts = bpf_ktime_get_ns();
bpf_map_update_elem(&dns_flows, &dns_req, &ts, BPF_ANY);
}
id.direction = EGRESS;
} else { /* dns response */
id.direction = INGRESS;
fill_dns_id(&id, &dns_req, bpf_ntohs(dns.id), true);
fill_dns_id(pkt->id, &dns_req, dns_id, true);
u64 *value = bpf_map_lookup_elem(&dns_flows, &dns_req);
if (value != NULL) {
latency = bpf_ktime_get_ns() - *value;
pkt->dns_latency = ts - *value;
pkt->dns_id = dns_id;
pkt->dns_flags = flags;
bpf_map_delete_elem(&dns_flows, &dns_req);
find_or_create_dns_flow(&id, &dns, skb->len, flags, latency);
}
} // end of dns response
} // end of dns port check
return 0;
}
SEC("tracepoint/net/net_dev_queue")
int trace_net_packets(struct trace_event_raw_net_dev_template *args) {
struct sk_buff skb;
__builtin_memset(&skb, 0, sizeof(skb));
bpf_probe_read(&skb, sizeof(struct sk_buff), args->skbaddr);
return trace_dns(&skb);
}
#endif // __DNS_TRACKER_H__
......@@ -73,6 +73,9 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
calculate_flow_rtt(&pkt, direction, data_end);
}
if (enable_dns_tracking) {
track_dns_packet(skb, &pkt);
}
// TODO: we need to add spinlock here when we deprecate versions prior to 5.1, or provide
// a spinlocked alternative version and use it selectively https://lwn.net/Articles/779120/
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
......@@ -91,7 +94,9 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
if (pkt.rtt > aggregate_flow->flow_rtt) {
aggregate_flow->flow_rtt = pkt.rtt;
}
aggregate_flow->dns_record.id = pkt.dns_id;
aggregate_flow->dns_record.flags = pkt.dns_flags;
aggregate_flow->dns_record.latency = pkt.dns_latency;
long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY);
if (trace_messages && ret != 0) {
// usually error -16 (-EBUSY) is printed here.
......@@ -111,6 +116,9 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
.flags = pkt.flags,
.flow_rtt = pkt.rtt,
.dscp = pkt.dscp,
.dns_record.id = pkt.dns_id,
.dns_record.flags = pkt.dns_flags,
.dns_record.latency = pkt.dns_latency,
};
// even if we know that the entry is new, another CPU might be concurrently inserting a flow
......
......@@ -159,6 +159,9 @@ typedef struct pkt_info_t {
void *l4_hdr; // Stores the actual l4 header
u64 rtt; // rtt calculated from the flow if possible. else zero
u8 dscp; // IPv4/6 DSCP value
u16 dns_id;
u16 dns_flags;
u64 dns_latency;
} pkt_info;
// Structure for payload metadata
......
......@@ -128,7 +128,6 @@ type BpfProgramSpecs struct {
IngressFlowParse *ebpf.ProgramSpec `ebpf:"ingress_flow_parse"`
IngressPcaParse *ebpf.ProgramSpec `ebpf:"ingress_pca_parse"`
KfreeSkb *ebpf.ProgramSpec `ebpf:"kfree_skb"`
TraceNetPackets *ebpf.ProgramSpec `ebpf:"trace_net_packets"`
}
// BpfMapSpecs contains maps before they are loaded into the kernel.
......@@ -187,7 +186,6 @@ type BpfPrograms struct {
IngressFlowParse *ebpf.Program `ebpf:"ingress_flow_parse"`
IngressPcaParse *ebpf.Program `ebpf:"ingress_pca_parse"`
KfreeSkb *ebpf.Program `ebpf:"kfree_skb"`
TraceNetPackets *ebpf.Program `ebpf:"trace_net_packets"`
}
func (p *BpfPrograms) Close() error {
......@@ -197,7 +195,6 @@ func (p *BpfPrograms) Close() error {
p.IngressFlowParse,
p.IngressPcaParse,
p.KfreeSkb,
p.TraceNetPackets,
)
}
......
No preview for this file type
......@@ -128,7 +128,6 @@ type BpfProgramSpecs struct {
IngressFlowParse *ebpf.ProgramSpec `ebpf:"ingress_flow_parse"`
IngressPcaParse *ebpf.ProgramSpec `ebpf:"ingress_pca_parse"`
KfreeSkb *ebpf.ProgramSpec `ebpf:"kfree_skb"`
TraceNetPackets *ebpf.ProgramSpec `ebpf:"trace_net_packets"`
}
// BpfMapSpecs contains maps before they are loaded into the kernel.
......@@ -187,7 +186,6 @@ type BpfPrograms struct {
IngressFlowParse *ebpf.Program `ebpf:"ingress_flow_parse"`
IngressPcaParse *ebpf.Program `ebpf:"ingress_pca_parse"`
KfreeSkb *ebpf.Program `ebpf:"kfree_skb"`
TraceNetPackets *ebpf.Program `ebpf:"trace_net_packets"`
}
func (p *BpfPrograms) Close() error {
......@@ -197,7 +195,6 @@ func (p *BpfPrograms) Close() error {
p.IngressFlowParse,
p.IngressPcaParse,
p.KfreeSkb,
p.TraceNetPackets,
)
}
......
No preview for this file type
......@@ -35,14 +35,14 @@ const (
flowSequencesMap = "flow_sequences"
dnsLatencyMap = "dns_flows"
// constants defined in flows.c as "volatile const"
constSampling = "sampling"
constTraceMessages = "trace_messages"
constEnableRtt = "enable_rtt"
pktDropHook = "kfree_skb"
dnsTraceHook = "net_dev_queue"
constPcaPort = "pca_port"
constPcaProto = "pca_proto"
pcaRecordsMap = "packet_record"
constSampling = "sampling"
constTraceMessages = "trace_messages"
constEnableRtt = "enable_rtt"
constEnableDNSTracking = "enable_dns_tracking"
pktDropHook = "kfree_skb"
constPcaPort = "pca_port"
constPcaProto = "pca_proto"
pcaRecordsMap = "packet_record"
)
var log = logrus.WithField("component", "ebpf.FlowFetcher")
......@@ -53,16 +53,15 @@ var plog = logrus.WithField("component", "ebpf.PacketFetcher")
// and to flows that are forwarded by the kernel via ringbuffer because could not be aggregated
// in the map
type FlowFetcher struct {
objects *BpfObjects
qdiscs map[ifaces.Interface]*netlink.GenericQdisc
egressFilters map[ifaces.Interface]*netlink.BpfFilter
ingressFilters map[ifaces.Interface]*netlink.BpfFilter
ringbufReader *ringbuf.Reader
cacheMaxSize int
enableIngress bool
enableEgress bool
pktDropsTracePoint link.Link
dnsTrackerTracePoint link.Link
objects *BpfObjects
qdiscs map[ifaces.Interface]*netlink.GenericQdisc
egressFilters map[ifaces.Interface]*netlink.BpfFilter
ingressFilters map[ifaces.Interface]*netlink.BpfFilter
ringbufReader *ringbuf.Reader
cacheMaxSize int
enableIngress bool
enableEgress bool
pktDropsTracePoint link.Link
}
type FlowFetcherConfig struct {
......@@ -113,14 +112,20 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
log.Debugf("RTT calculations are enabled")
}
if !cfg.DNSTracker {
enableDNSTracking := 0
if cfg.DNSTracker {
enableDNSTracking = 1
}
if enableDNSTracking == 0 {
spec.Maps[dnsLatencyMap].MaxEntries = 1
}
if err := spec.RewriteConstants(map[string]interface{}{
constSampling: uint32(cfg.Sampling),
constTraceMessages: uint8(traceMsgs),
constEnableRtt: uint8(enableRtt),
constSampling: uint32(cfg.Sampling),
constTraceMessages: uint8(traceMsgs),
constEnableRtt: uint8(enableRtt),
constEnableDNSTracking: uint8(enableDNSTracking),
}); err != nil {
return nil, fmt.Errorf("rewriting BPF constants definition: %w", err)
}
......@@ -149,30 +154,21 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
}
}
var dnsTrackerLink link.Link
if cfg.DNSTracker {
dnsTrackerLink, err = link.Tracepoint("net", dnsTraceHook, objects.TraceNetPackets, nil)
if err != nil {
return nil, fmt.Errorf("failed to attach the BPF program to trace_net_packets: %w", err)
}
}
// read events from igress+egress ringbuffer
flows, err := ringbuf.NewReader(objects.DirectFlows)
if err != nil {
return nil, fmt.Errorf("accessing to ringbuffer: %w", err)
}
return &FlowFetcher{
objects: &objects,
ringbufReader: flows,
egressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{},
cacheMaxSize: cfg.CacheMaxSize,
enableIngress: cfg.EnableIngress,
enableEgress: cfg.EnableEgress,
pktDropsTracePoint: pktDropsLink,
dnsTrackerTracePoint: dnsTrackerLink,
objects: &objects,
ringbufReader: flows,
egressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{},
cacheMaxSize: cfg.CacheMaxSize,
enableIngress: cfg.EnableIngress,
enableEgress: cfg.EnableEgress,
pktDropsTracePoint: pktDropsLink,
}, nil
}
......@@ -301,11 +297,6 @@ func (m *FlowFetcher) Close() error {
errs = append(errs, err)
}
}
if m.dnsTrackerTracePoint != nil {
if err := m.dnsTrackerTracePoint.Close(); err != nil {
errs = append(errs, err)
}
}
// m.ringbufReader.Read is a blocking operation, so we need to close the ring buffer
// from another goroutine to avoid the system not being able to exit if there
// isn't traffic in a given interface
......@@ -491,7 +482,6 @@ func kernelSpecificLoadAndAssign(oldKernel bool, spec *ebpf.CollectionSpec) (Bpf
type NewBpfPrograms struct {
EgressFlowParse *ebpf.Program `ebpf:"egress_flow_parse"`
IngressFlowParse *ebpf.Program `ebpf:"ingress_flow_parse"`
TraceNetPackets *ebpf.Program `ebpf:"trace_net_packets"`
}
type NewBpfObjects struct {
NewBpfPrograms
......@@ -517,7 +507,6 @@ func kernelSpecificLoadAndAssign(oldKernel bool, spec *ebpf.CollectionSpec) (Bpf
objects.FlowSequences = newObjects.FlowSequences
objects.EgressFlowParse = newObjects.EgressFlowParse
objects.IngressFlowParse = newObjects.IngressFlowParse
objects.TraceNetPackets = newObjects.TraceNetPackets
objects.KfreeSkb = nil
} else {
if err := spec.LoadAndAssign(&objects, nil); err != nil {
......@@ -574,13 +563,12 @@ func NewPacketFetcher(
objects.DirectFlows = nil
objects.AggregatedFlows = nil
objects.FlowSequences = nil
objects.TraceNetPackets = nil
delete(spec.Programs, aggregatedFlowsMap)
delete(spec.Programs, flowSequencesMap)
delete(spec.Programs, constSampling)
delete(spec.Programs, constTraceMessages)
delete(spec.Programs, constEnableRtt)
delete(spec.Programs, dnsTraceHook)
delete(spec.Programs, constEnableDNSTracking)
pcaPort := 0
pcaProto := 0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment