Skip to content
Snippets Groups Projects
Unverified Commit 27baf291 authored by Mohamed S. Mahmoud's avatar Mohamed S. Mahmoud Committed by GitHub
Browse files

NETOBSERV-1061: Add TCP drop and DNS tracking hooks (#115)


* Add TCP drop hook and update flows metrics

Signed-off-by: default avatarmsherif1234 <mmahmoud@redhat.com>

* update agent e2e manifest to mount kernel debug volume

Signed-off-by: default avatarmsherif1234 <mmahmoud@redhat.com>

* Add light weight DNS tracker hook

Signed-off-by: default avatarmsherif1234 <mmahmoud@redhat.com>

* rename tcpdrop fields to reflect its the latest drop
fix lint errors
flatten icmp block

Signed-off-by: default avatarmsherif1234 <mmahmoud@redhat.com>

---------

Signed-off-by: default avatarmsherif1234 <mmahmoud@redhat.com>
parent 2d63d907
Branches
Tags
No related merge requests found
Showing
with 742 additions and 267 deletions
#ifndef __CONFIGS_H__
#define __CONFIGS_H__
// Constant definitions, to be overridden by the invoker
volatile const u32 sampling = 0;
volatile const u8 trace_messages = 0;
#endif //__CONFIGS_H__
/*
light weight DNS tracker using trace points.
*/
#ifndef __DNS_TRACKER_H__
#define __DNS_TRACKER_H__
#include "utils.h"
#define DNS_PORT 53
#define DNS_QR_FLAG 0x8000
#define UDP_MAXMSG 512
struct dns_header {
u16 id;
u16 flags;
u16 qdcount;
u16 ancount;
u16 nscount;
u16 arcount;
};
static inline void find_or_create_dns_flow(flow_id *id, struct dns_header *dns, int len, int dir, u16 flags) {
flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, id);
u64 current_time = bpf_ktime_get_ns();
// net_dev_queue trace point hook will run before TC hooks, so the flow shouldn't exists, if it does
// that indicates we have a stale DNS query/response or in the middle of TCP flow so we will do nothing
if (aggregate_flow == NULL) {
// there is no matching flows so lets create new one and add the drops
flow_metrics new_flow;
__builtin_memset(&new_flow, 0, sizeof(new_flow));
new_flow.start_mono_time_ts = current_time;
new_flow.end_mono_time_ts = current_time;
new_flow.packets = 1;
new_flow.bytes = len;
new_flow.flags = flags;
new_flow.dns_record.id = bpf_ntohs(dns->id);
new_flow.dns_record.flags = bpf_ntohs(dns->flags);
if (dir == EGRESS) {
new_flow.dns_record.req_mono_time_ts = current_time;
} else {
new_flow.dns_record.rsp_mono_time_ts = current_time;
}
bpf_map_update_elem(&aggregated_flows, id, &new_flow, BPF_ANY);
}
}
static inline int trace_dns(struct sk_buff *skb) {
flow_id id;
u8 protocol = 0;
u16 family = 0,flags = 0, len = 0;
__builtin_memset(&id, 0, sizeof(id));
id.if_index = skb->skb_iif;
// read L2 info
set_key_with_l2_info(skb, &id, &family);
// read L3 info
set_key_with_l3_info(skb, family, &id, &protocol);
switch (protocol) {
case IPPROTO_UDP:
len = set_key_with_udp_info(skb, &id, IPPROTO_UDP);
// make sure udp payload doesn't exceed max msg size
if (len - sizeof(struct udphdr) > UDP_MAXMSG) {
return -1;
}
// set the length to udp hdr size as it will be used below to locate dns header
len = sizeof(struct udphdr);
break;
case IPPROTO_TCP:
len = set_key_with_tcp_info(skb, &id, IPPROTO_TCP, &flags);
break;
default:
return -1;
}
// check for DNS packets
if (id.dst_port == DNS_PORT || id.src_port == DNS_PORT) {
struct dns_header dns;
bpf_probe_read(&dns, sizeof(dns), (struct dns_header *)(skb->head + skb->transport_header + len));
if ((bpf_ntohs(dns.flags) & DNS_QR_FLAG) == 0) { /* dns query */
id.direction = EGRESS;
} else { /* dns response */
id.direction = INGRESS;
} // end of dns response
find_or_create_dns_flow(&id, &dns, skb->len, id.direction, flags);
} // end of dns port check
return 0;
}
SEC("tracepoint/net/net_dev_queue")
int trace_net_packets(struct trace_event_raw_net_dev_template *args) {
struct sk_buff skb;
__builtin_memset(&skb, 0, sizeof(skb));
bpf_probe_read(&skb, sizeof(struct sk_buff), args->skbaddr);
return trace_dns(&skb);
}
#endif // __DNS_TRACKER_H__
......@@ -10,6 +10,8 @@ typedef __u16 u16;
typedef __u32 u32;
typedef __u64 u64;
#define AF_INET 2
#define AF_INET6 10
#define ETH_ALEN 6
#define ETH_P_IP 0x0800
#define ETH_P_IPV6 0x86DD
......@@ -30,8 +32,24 @@ typedef struct flow_metrics_t {
// 0 otherwise
// https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
u8 errno;
struct tcp_drops_t {
u32 packets;
u64 bytes;
u16 latest_flags;
u8 latest_state;
u32 latest_drop_cause;
} __attribute__((packed)) tcp_drops;
struct dns_record_t {
u16 id;
u16 flags;
u64 req_mono_time_ts;
u64 rsp_mono_time_ts;
} __attribute__((packed)) dns_record;
} __attribute__((packed)) flow_metrics;
// Force emitting struct tcp_drops into the ELF.
const struct tcp_drops_t *unused0 __attribute__((unused));
// Force emitting struct flow_metrics into the ELF.
const struct flow_metrics_t *unused1 __attribute__((unused));
......@@ -71,4 +89,8 @@ typedef struct flow_record_t {
// Force emitting struct flow_record into the ELF.
const struct flow_record_t *unused3 __attribute__((unused));
// Force emitting struct dns_record into the ELF.
const struct dns_record_t *unused4 __attribute__((unused));
#endif
......@@ -13,226 +13,10 @@
until an entry is available.
4) When hash collision is detected, we send the new entry to userpace via ringbuffer.
*/
#include <vmlinux.h>
#include <bpf_helpers.h>
#include "utils.h"
#include "tcp_drops.h"
#include "dns_tracker.h"
#include "flow.h"
#define DISCARD 1
#define SUBMIT 0
// according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml
#define INGRESS 0
#define EGRESS 1
// Flags according to RFC 9293 & https://www.iana.org/assignments/ipfix/ipfix.xhtml
#define FIN_FLAG 0x01
#define SYN_FLAG 0x02
#define RST_FLAG 0x04
#define PSH_FLAG 0x08
#define ACK_FLAG 0x10
#define URG_FLAG 0x20
#define ECE_FLAG 0x40
#define CWR_FLAG 0x80
// Custom flags exported
#define SYN_ACK_FLAG 0x100
#define FIN_ACK_FLAG 0x200
#define RST_ACK_FLAG 0x400
#if defined(__BYTE_ORDER__) && defined(__ORDER_LITTLE_ENDIAN__) && \
__BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
#define bpf_ntohs(x) __builtin_bswap16(x)
#define bpf_htons(x) __builtin_bswap16(x)
#elif defined(__BYTE_ORDER__) && defined(__ORDER_BIG_ENDIAN__) && \
__BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
#define bpf_ntohs(x) (x)
#define bpf_htons(x) (x)
#else
# error "Endianness detection needs to be set up for your compiler?!"
#endif
// Common Ringbuffer as a conduit for ingress/egress flows to userspace
struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(max_entries, 1 << 24);
} direct_flows SEC(".maps");
// Key: the flow identifier. Value: the flow metrics for that identifier.
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, flow_id);
__type(value, flow_metrics);
__uint(max_entries, 1 << 24);
__uint(map_flags, BPF_F_NO_PREALLOC);
} aggregated_flows SEC(".maps");
// Constant definitions, to be overridden by the invoker
volatile const u32 sampling = 0;
volatile const u8 trace_messages = 0;
const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff};
// sets the TCP header flags for connection information
static inline void set_flags(struct tcphdr *th, u16 *flags) {
//If both ACK and SYN are set, then it is server -> client communication during 3-way handshake.
if (th->ack && th->syn) {
*flags |= SYN_ACK_FLAG;
} else if (th->ack && th->fin ) {
// If both ACK and FIN are set, then it is graceful termination from server.
*flags |= FIN_ACK_FLAG;
} else if (th->ack && th->rst ) {
// If both ACK and RST are set, then it is abrupt connection termination.
*flags |= RST_ACK_FLAG;
} else if (th->fin) {
*flags |= FIN_FLAG;
} else if (th->syn) {
*flags |= SYN_FLAG;
} else if (th->ack) {
*flags |= ACK_FLAG;
} else if (th->rst) {
*flags |= RST_FLAG;
} else if (th->psh) {
*flags |= PSH_FLAG;
} else if (th->urg) {
*flags |= URG_FLAG;
} else if (th->ece) {
*flags |= ECE_FLAG;
} else if (th->cwr) {
*flags |= CWR_FLAG;
}
}
// L4_info structure contains L4 headers parsed information.
struct l4_info_t {
// TCP/UDP/SCTP source port in host byte order
u16 src_port;
// TCP/UDP/SCTP destination port in host byte order
u16 dst_port;
// ICMPv4/ICMPv6 type value
u8 icmp_type;
// ICMPv4/ICMPv6 code value
u8 icmp_code;
// TCP flags
u16 flags;
};
// Extract L4 info for the supported protocols
static inline void fill_l4info(void *l4_hdr_start, void *data_end, u8 protocol,
struct l4_info_t *l4_info) {
switch (protocol) {
case IPPROTO_TCP: {
struct tcphdr *tcp = l4_hdr_start;
if ((void *)tcp + sizeof(*tcp) <= data_end) {
l4_info->src_port = bpf_ntohs(tcp->source);
l4_info->dst_port = bpf_ntohs(tcp->dest);
set_flags(tcp, &l4_info->flags);
}
} break;
case IPPROTO_UDP: {
struct udphdr *udp = l4_hdr_start;
if ((void *)udp + sizeof(*udp) <= data_end) {
l4_info->src_port = bpf_ntohs(udp->source);
l4_info->dst_port = bpf_ntohs(udp->dest);
}
} break;
case IPPROTO_SCTP: {
struct sctphdr *sctph = l4_hdr_start;
if ((void *)sctph + sizeof(*sctph) <= data_end) {
l4_info->src_port = bpf_ntohs(sctph->source);
l4_info->dst_port = bpf_ntohs(sctph->dest);
}
} break;
case IPPROTO_ICMP: {
struct icmphdr *icmph = l4_hdr_start;
if ((void *)icmph + sizeof(*icmph) <= data_end) {
l4_info->icmp_type = icmph->type;
l4_info->icmp_code = icmph->code;
}
} break;
case IPPROTO_ICMPV6: {
struct icmp6hdr *icmp6h = l4_hdr_start;
if ((void *)icmp6h + sizeof(*icmp6h) <= data_end) {
l4_info->icmp_type = icmp6h->icmp6_type;
l4_info->icmp_code = icmp6h->icmp6_code;
}
} break;
default:
break;
}
}
// sets flow fields from IPv4 header information
static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id, u16 *flags) {
struct l4_info_t l4_info;
void *l4_hdr_start;
l4_hdr_start = (void *)ip + sizeof(*ip);
if (l4_hdr_start > data_end) {
return DISCARD;
}
__builtin_memset(&l4_info, 0, sizeof(l4_info));
__builtin_memcpy(id->src_ip, ip4in6, sizeof(ip4in6));
__builtin_memcpy(id->dst_ip, ip4in6, sizeof(ip4in6));
__builtin_memcpy(id->src_ip + sizeof(ip4in6), &ip->saddr, sizeof(ip->saddr));
__builtin_memcpy(id->dst_ip + sizeof(ip4in6), &ip->daddr, sizeof(ip->daddr));
id->transport_protocol = ip->protocol;
fill_l4info(l4_hdr_start, data_end, ip->protocol, &l4_info);
id->src_port = l4_info.src_port;
id->dst_port = l4_info.dst_port;
id->icmp_type = l4_info.icmp_type;
id->icmp_code = l4_info.icmp_code;
*flags = l4_info.flags;
return SUBMIT;
}
// sets flow fields from IPv6 header information
static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, flow_id *id, u16 *flags) {
struct l4_info_t l4_info;
void *l4_hdr_start;
l4_hdr_start = (void *)ip + sizeof(*ip);
if (l4_hdr_start > data_end) {
return DISCARD;
}
__builtin_memset(&l4_info, 0, sizeof(l4_info));
__builtin_memcpy(id->src_ip, ip->saddr.in6_u.u6_addr8, 16);
__builtin_memcpy(id->dst_ip, ip->daddr.in6_u.u6_addr8, 16);
id->transport_protocol = ip->nexthdr;
fill_l4info(l4_hdr_start, data_end, ip->nexthdr, &l4_info);
id->src_port = l4_info.src_port;
id->dst_port = l4_info.dst_port;
id->icmp_type = l4_info.icmp_type;
id->icmp_code = l4_info.icmp_code;
*flags = l4_info.flags;
return SUBMIT;
}
// sets flow fields from Ethernet header information
static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, flow_id *id, u16 *flags) {
if ((void *)eth + sizeof(*eth) > data_end) {
return DISCARD;
}
__builtin_memcpy(id->dst_mac, eth->h_dest, ETH_ALEN);
__builtin_memcpy(id->src_mac, eth->h_source, ETH_ALEN);
id->eth_protocol = bpf_ntohs(eth->h_proto);
if (id->eth_protocol == ETH_P_IP) {
struct iphdr *ip = (void *)eth + sizeof(*eth);
return fill_iphdr(ip, data_end, id, flags);
} else if (id->eth_protocol == ETH_P_IPV6) {
struct ipv6hdr *ip6 = (void *)eth + sizeof(*eth);
return fill_ip6hdr(ip6, data_end, id, flags);
} else {
// TODO : Need to implement other specific ethertypes if needed
// For now other parts of flow id remain zero
__builtin_memset(&(id->src_ip), 0, sizeof(struct in6_addr));
__builtin_memset(&(id->dst_ip), 0, sizeof(struct in6_addr));
id->transport_protocol = 0;
id->src_port = 0;
id->dst_port = 0;
}
return SUBMIT;
}
static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
// If sampling is defined, will only parse 1 out of "sampling" flows
......@@ -317,4 +101,5 @@ SEC("tc_egress")
int egress_flow_parse(struct __sk_buff *skb) {
return flow_monitor(skb, EGRESS);
}
char _license[] SEC("license") = "GPL";
#ifndef __MAPS_DEFINITION_H__
#define __MAPS_DEFINITION_H__
#include <vmlinux.h>
// Common Ringbuffer as a conduit for ingress/egress flows to userspace
struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(max_entries, 1 << 24);
} direct_flows SEC(".maps");
// Key: the flow identifier. Value: the flow metrics for that identifier.
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, flow_id);
__type(value, flow_metrics);
__uint(max_entries, 1 << 24);
__uint(map_flags, BPF_F_NO_PREALLOC);
} aggregated_flows SEC(".maps");
#endif //__MAPS_DEFINITION_H__
/*
TCPDrops using trace points.
*/
#ifndef __TCP_DROPS_H__
#define __TCP_DROPS_H__
#include "utils.h"
static inline int trace_tcp_drop(void *ctx, struct sock *sk,
struct sk_buff *skb,
enum skb_drop_reason reason) {
if (sk == NULL)
return 0;
flow_id id;
__builtin_memset(&id, 0, sizeof(id));
u8 state = 0, protocol = 0;
u16 family = 0,flags = 0;
// pull in details from the packet headers and the sock struct
bpf_probe_read(&state, sizeof(u8), (u8 *)&sk->__sk_common.skc_state);
id.if_index = skb->skb_iif;
// read L2 info
set_key_with_l2_info(skb, &id, &family);
// read L3 info
set_key_with_l3_info(skb, family, &id, &protocol);
// We only support TCP drops for any other protocol just return w/o doing anything
if (protocol != IPPROTO_TCP) {
return 0;
}
// read L4 info
set_key_with_tcp_info(skb, &id, protocol, &flags);
long ret = 0;
for (direction_t dir = INGRESS; dir < MAX_DIRECTION; dir++) {
id.direction = dir;
ret = tcp_drop_lookup_and_update_flow(skb, &id, state, flags, reason);
if (ret == 0) {
return 0;
}
}
// there is no matching flows so lets create new one and add the drops
u64 current_time = bpf_ktime_get_ns();
id.direction = INGRESS;
flow_metrics new_flow = {
.start_mono_time_ts = current_time,
.end_mono_time_ts = current_time,
.flags = flags,
.tcp_drops.packets = 1,
.tcp_drops.bytes = skb->len,
.tcp_drops.latest_state = state,
.tcp_drops.latest_flags = flags,
.tcp_drops.latest_drop_cause = reason,
};
ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY);
if (trace_messages && ret != 0) {
bpf_printk("error tcp drop creating new flow %d\n", ret);
}
return ret;
}
SEC("tracepoint/skb/kfree_skb")
int kfree_skb(struct trace_event_raw_kfree_skb *args) {
struct sk_buff skb;
__builtin_memset(&skb, 0, sizeof(skb));
bpf_probe_read(&skb, sizeof(struct sk_buff), args->skbaddr);
struct sock *sk = skb.sk;
enum skb_drop_reason reason = args->reason;
// SKB_NOT_DROPPED_YET,
// SKB_CONSUMED,
// SKB_DROP_REASON_NOT_SPECIFIED,
if (reason > SKB_DROP_REASON_NOT_SPECIFIED) {
return trace_tcp_drop(args, sk, &skb, reason);
}
return 0;
}
#endif //__TCP_DROPS_H__
#ifndef __UTILS_H__
#define __UTILS_H__
#include <vmlinux.h>
#include <bpf_helpers.h>
#include "flow.h"
#include "maps_definition.h"
#include "configs.h"
#define DISCARD 1
#define SUBMIT 0
// according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml
typedef enum {
INGRESS = 0,
EGRESS = 1,
MAX_DIRECTION = 2,
} direction_t;
// L4_info structure contains L4 headers parsed information.
struct l4_info_t {
// TCP/UDP/SCTP source port in host byte order
u16 src_port;
// TCP/UDP/SCTP destination port in host byte order
u16 dst_port;
// ICMPv4/ICMPv6 type value
u8 icmp_type;
// ICMPv4/ICMPv6 code value
u8 icmp_code;
// TCP flags
u16 flags;
};
const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff};
// Flags according to RFC 9293 & https://www.iana.org/assignments/ipfix/ipfix.xhtml
#define FIN_FLAG 0x01
#define SYN_FLAG 0x02
#define RST_FLAG 0x04
#define PSH_FLAG 0x08
#define ACK_FLAG 0x10
#define URG_FLAG 0x20
#define ECE_FLAG 0x40
#define CWR_FLAG 0x80
// Custom flags exported
#define SYN_ACK_FLAG 0x100
#define FIN_ACK_FLAG 0x200
#define RST_ACK_FLAG 0x400
#if defined(__BYTE_ORDER__) && defined(__ORDER_LITTLE_ENDIAN__) && \
__BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
#define bpf_ntohs(x) __builtin_bswap16(x)
#define bpf_htons(x) __builtin_bswap16(x)
#elif defined(__BYTE_ORDER__) && defined(__ORDER_BIG_ENDIAN__) && \
__BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
#define bpf_ntohs(x) (x)
#define bpf_htons(x) (x)
#else
# error "Endianness detection needs to be set up for your compiler?!"
#endif
// sets the TCP header flags for connection information
static inline void set_flags(struct tcphdr *th, u16 *flags) {
//If both ACK and SYN are set, then it is server -> client communication during 3-way handshake.
if (th->ack && th->syn) {
*flags |= SYN_ACK_FLAG;
} else if (th->ack && th->fin ) {
// If both ACK and FIN are set, then it is graceful termination from server.
*flags |= FIN_ACK_FLAG;
} else if (th->ack && th->rst ) {
// If both ACK and RST are set, then it is abrupt connection termination.
*flags |= RST_ACK_FLAG;
} else if (th->fin) {
*flags |= FIN_FLAG;
} else if (th->syn) {
*flags |= SYN_FLAG;
} else if (th->ack) {
*flags |= ACK_FLAG;
} else if (th->rst) {
*flags |= RST_FLAG;
} else if (th->psh) {
*flags |= PSH_FLAG;
} else if (th->urg) {
*flags |= URG_FLAG;
} else if (th->ece) {
*flags |= ECE_FLAG;
} else if (th->cwr) {
*flags |= CWR_FLAG;
}
}
// Extract L4 info for the supported protocols
static inline void fill_l4info(void *l4_hdr_start, void *data_end, u8 protocol,
struct l4_info_t *l4_info) {
switch (protocol) {
case IPPROTO_TCP: {
struct tcphdr *tcp = l4_hdr_start;
if ((void *)tcp + sizeof(*tcp) <= data_end) {
l4_info->src_port = bpf_ntohs(tcp->source);
l4_info->dst_port = bpf_ntohs(tcp->dest);
set_flags(tcp, &l4_info->flags);
}
} break;
case IPPROTO_UDP: {
struct udphdr *udp = l4_hdr_start;
if ((void *)udp + sizeof(*udp) <= data_end) {
l4_info->src_port = bpf_ntohs(udp->source);
l4_info->dst_port = bpf_ntohs(udp->dest);
}
} break;
case IPPROTO_SCTP: {
struct sctphdr *sctph = l4_hdr_start;
if ((void *)sctph + sizeof(*sctph) <= data_end) {
l4_info->src_port = bpf_ntohs(sctph->source);
l4_info->dst_port = bpf_ntohs(sctph->dest);
}
} break;
case IPPROTO_ICMP: {
struct icmphdr *icmph = l4_hdr_start;
if ((void *)icmph + sizeof(*icmph) <= data_end) {
l4_info->icmp_type = icmph->type;
l4_info->icmp_code = icmph->code;
}
} break;
case IPPROTO_ICMPV6: {
struct icmp6hdr *icmp6h = l4_hdr_start;
if ((void *)icmp6h + sizeof(*icmp6h) <= data_end) {
l4_info->icmp_type = icmp6h->icmp6_type;
l4_info->icmp_code = icmp6h->icmp6_code;
}
} break;
default:
break;
}
}
// sets flow fields from IPv4 header information
static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id, u16 *flags) {
struct l4_info_t l4_info;
void *l4_hdr_start;
l4_hdr_start = (void *)ip + sizeof(*ip);
if (l4_hdr_start > data_end) {
return DISCARD;
}
__builtin_memset(&l4_info, 0, sizeof(l4_info));
__builtin_memcpy(id->src_ip, ip4in6, sizeof(ip4in6));
__builtin_memcpy(id->dst_ip, ip4in6, sizeof(ip4in6));
__builtin_memcpy(id->src_ip + sizeof(ip4in6), &ip->saddr, sizeof(ip->saddr));
__builtin_memcpy(id->dst_ip + sizeof(ip4in6), &ip->daddr, sizeof(ip->daddr));
id->transport_protocol = ip->protocol;
fill_l4info(l4_hdr_start, data_end, ip->protocol, &l4_info);
id->src_port = l4_info.src_port;
id->dst_port = l4_info.dst_port;
id->icmp_type = l4_info.icmp_type;
id->icmp_code = l4_info.icmp_code;
*flags = l4_info.flags;
return SUBMIT;
}
// sets flow fields from IPv6 header information
static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, flow_id *id, u16 *flags) {
struct l4_info_t l4_info;
void *l4_hdr_start;
l4_hdr_start = (void *)ip + sizeof(*ip);
if (l4_hdr_start > data_end) {
return DISCARD;
}
__builtin_memset(&l4_info, 0, sizeof(l4_info));
__builtin_memcpy(id->src_ip, ip->saddr.in6_u.u6_addr8, IP_MAX_LEN);
__builtin_memcpy(id->dst_ip, ip->daddr.in6_u.u6_addr8, IP_MAX_LEN);
id->transport_protocol = ip->nexthdr;
fill_l4info(l4_hdr_start, data_end, ip->nexthdr, &l4_info);
id->src_port = l4_info.src_port;
id->dst_port = l4_info.dst_port;
id->icmp_type = l4_info.icmp_type;
id->icmp_code = l4_info.icmp_code;
*flags = l4_info.flags;
return SUBMIT;
}
// sets flow fields from Ethernet header information
static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, flow_id *id, u16 *flags) {
if ((void *)eth + sizeof(*eth) > data_end) {
return DISCARD;
}
__builtin_memcpy(id->dst_mac, eth->h_dest, ETH_ALEN);
__builtin_memcpy(id->src_mac, eth->h_source, ETH_ALEN);
id->eth_protocol = bpf_ntohs(eth->h_proto);
if (id->eth_protocol == ETH_P_IP) {
struct iphdr *ip = (void *)eth + sizeof(*eth);
return fill_iphdr(ip, data_end, id, flags);
} else if (id->eth_protocol == ETH_P_IPV6) {
struct ipv6hdr *ip6 = (void *)eth + sizeof(*eth);
return fill_ip6hdr(ip6, data_end, id, flags);
} else {
// TODO : Need to implement other specific ethertypes if needed
// For now other parts of flow id remain zero
__builtin_memset(&(id->src_ip), 0, sizeof(struct in6_addr));
__builtin_memset(&(id->dst_ip), 0, sizeof(struct in6_addr));
id->transport_protocol = 0;
id->src_port = 0;
id->dst_port = 0;
}
return SUBMIT;
}
static inline void set_key_with_l2_info(struct sk_buff *skb, flow_id *id, u16 *family) {
struct ethhdr eth;
__builtin_memset(&eth, 0, sizeof(eth));
bpf_probe_read(&eth, sizeof(eth), (struct ethhdr *)(skb->head + skb->mac_header));
id->eth_protocol = bpf_ntohs(eth.h_proto);
__builtin_memcpy(id->dst_mac, eth.h_dest, ETH_ALEN);
__builtin_memcpy(id->src_mac, eth.h_source, ETH_ALEN);
if (id->eth_protocol == ETH_P_IP) {
*family = AF_INET;
} else if (id->eth_protocol == ETH_P_IPV6) {
*family = AF_INET6;
}
}
static inline void set_key_with_l3_info(struct sk_buff *skb, u16 family, flow_id *id, u8 *protocol) {
if (family == AF_INET) {
struct iphdr ip;
__builtin_memset(&ip, 0, sizeof(ip));
bpf_probe_read(&ip, sizeof(ip), (struct iphdr *)(skb->head + skb->network_header));
__builtin_memcpy(id->src_ip, ip4in6, sizeof(ip4in6));
__builtin_memcpy(id->dst_ip, ip4in6, sizeof(ip4in6));
__builtin_memcpy(id->src_ip + sizeof(ip4in6), &ip.saddr, sizeof(ip.saddr));
__builtin_memcpy(id->dst_ip + sizeof(ip4in6), &ip.daddr, sizeof(ip.daddr));
*protocol = ip.protocol;
} else if (family == AF_INET6) {
struct ipv6hdr ip;
__builtin_memset(&ip, 0, sizeof(ip));
bpf_probe_read(&ip, sizeof(ip), (struct ipv6hdr *)(skb->head + skb->network_header));
__builtin_memcpy(id->src_ip, ip.saddr.in6_u.u6_addr8, IP_MAX_LEN);
__builtin_memcpy(id->dst_ip, ip.daddr.in6_u.u6_addr8, IP_MAX_LEN);
*protocol = ip.nexthdr;
}
}
static inline int set_key_with_tcp_info(struct sk_buff *skb, flow_id *id, u8 protocol, u16 *flags) {
u16 sport = 0,dport = 0;
struct tcphdr tcp;
__builtin_memset(&tcp, 0, sizeof(tcp));
bpf_probe_read(&tcp, sizeof(tcp), (struct tcphdr *)(skb->head + skb->transport_header));
sport = bpf_ntohs(tcp.source);
dport = bpf_ntohs(tcp.dest);
set_flags(&tcp, flags);
id->src_port = sport;
id->dst_port = dport;
id->transport_protocol = protocol;
return tcp.doff * sizeof(u32);
}
static inline int set_key_with_udp_info(struct sk_buff *skb, flow_id *id, u8 protocol) {
u16 sport = 0,dport = 0;
struct udphdr udp;
__builtin_memset(&udp, 0, sizeof(udp));
bpf_probe_read(&udp, sizeof(udp), (struct udp *)(skb->head + skb->transport_header));
sport = bpf_ntohs(udp.source);
dport = bpf_ntohs(udp.dest);
id->src_port = sport;
id->dst_port = dport;
id->transport_protocol = protocol;
return bpf_ntohs(udp.len);
}
static inline long tcp_drop_lookup_and_update_flow(struct sk_buff *skb, flow_id *id, u8 state, u16 flags,
enum skb_drop_reason reason) {
flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, id);
if (aggregate_flow != NULL) {
aggregate_flow->tcp_drops.packets += 1;
aggregate_flow->tcp_drops.bytes += skb->len;
aggregate_flow->tcp_drops.latest_state = state;
aggregate_flow->tcp_drops.latest_flags = flags;
aggregate_flow->tcp_drops.latest_drop_cause = reason;
long ret = bpf_map_update_elem(&aggregated_flows, id, aggregate_flow, BPF_ANY);
if (trace_messages && ret != 0) {
bpf_printk("error tcp drop updating flow %d\n", ret);
}
return 0;
}
return -1;
}
#endif // __UTILS_H__
......@@ -32,3 +32,12 @@ spec:
fieldPath: status.hostIP
- name: FLOWS_TARGET_PORT
value: "9999"
volumeMounts:
- name: bpf-kernel-debug
mountPath: /sys/kernel/debug
mountPropagation: Bidirectional
volumes:
- name: bpf-kernel-debug
hostPath:
path: /sys/kernel/debug
type: Directory
......@@ -34,3 +34,12 @@ spec:
fieldPath: status.hostIP
- name: FLOWS_TARGET_PORT
value: "9999"
volumeMounts:
- name: bpf-kernel-debug
mountPath: /sys/kernel/debug
mountPropagation: Bidirectional
volumes:
- name: bpf-kernel-debug
hostPath:
path: /sys/kernel/debug
type: Directory
......@@ -30,3 +30,12 @@ spec:
value: 200ms
- name: LOG_LEVEL
value: debug
volumeMounts:
- name: bpf-kernel-debug
mountPath: /sys/kernel/debug
mountPropagation: Bidirectional
volumes:
- name: bpf-kernel-debug
hostPath:
path: /sys/kernel/debug
type: Directory
......@@ -72,7 +72,7 @@ func main() {
for records := range receivedRecords {
for _, record := range records.Entries {
if record.EthProtocol == ipv6 {
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v\n",
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsReq: %v dnsRsp: %v\n",
ipProto[record.EthProtocol],
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
record.Interface,
......@@ -81,16 +81,20 @@ func main() {
net.IP(record.Network.GetDstAddr().GetIpv6()).To16(),
record.Transport.DstPort,
protocolByNumber[record.Transport.Protocol],
record.Icmp.IcmpType,
record.Icmp.IcmpCode,
record.IcmpType,
record.IcmpCode,
record.Direction,
record.Bytes,
record.Packets,
record.Flags,
record.TimeFlowEnd.AsTime().Local().Format("15:04:05.000000"),
record.GetDnsId(),
record.GetDnsFlags(),
record.GetTimeDnsReq(),
record.GetTimeDnsRsp(),
)
} else {
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v\n",
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsReq: %v dnsRsp: %v\n",
ipProto[record.EthProtocol],
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
record.Interface,
......@@ -99,13 +103,17 @@ func main() {
ipIntToNetIP(record.Network.GetDstAddr().GetIpv4()).String(),
record.Transport.DstPort,
protocolByNumber[record.Transport.Protocol],
record.Icmp.IcmpType,
record.Icmp.IcmpCode,
record.IcmpType,
record.IcmpCode,
record.Direction,
record.Bytes,
record.Packets,
record.Flags,
record.TimeFlowEnd.AsTime().Local().Format("15:04:05.000000"),
record.GetDnsId(),
record.GetDnsFlags(),
record.GetTimeDnsReq(),
record.GetTimeDnsRsp(),
)
}
}
......
......@@ -122,7 +122,7 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
debug = true
}
fetcher, err := ebpf.NewFlowFetcher(debug, cfg.Sampling, cfg.CacheMaxFlows, ingress, egress)
fetcher, err := ebpf.NewFlowFetcher(debug, cfg.Sampling, cfg.CacheMaxFlows, ingress, egress, cfg.EnableTCPDrops, cfg.EnableDNSTracking)
if err != nil {
return nil, err
}
......
......@@ -138,4 +138,8 @@ type Config struct {
ProfilePort int `env:"PROFILE_PORT"`
// EnableGC enables golang garbage collection run at the end of every map eviction, default is true
EnableGC bool `env:"ENABLE_GARBAGE_COLLECTION" envDefault:"true"`
// EnableTcpDrops enable TCP drops eBPF hook to account for tcp dropped flows
EnableTCPDrops bool `env:"ENABLE_TCP_DROPS" envDefault:"false"`
// EnableDNSTracking enable DNS tracking eBPF hook to track dns query/response flows
EnableDNSTracking bool `env:"ENABLE_DNS_TRACKING" envDefault:"false"`
}
......@@ -13,6 +13,13 @@ import (
"github.com/cilium/ebpf"
)
type BpfDnsRecordT struct {
Id uint16
Flags uint16
ReqMonoTimeTs uint64
RspMonoTimeTs uint64
}
type BpfFlowId BpfFlowIdT
type BpfFlowIdT struct {
......@@ -39,6 +46,8 @@ type BpfFlowMetricsT struct {
EndMonoTimeTs uint64
Flags uint16
Errno uint8
TcpDrops BpfTcpDropsT
DnsRecord BpfDnsRecordT
}
type BpfFlowRecordT struct {
......@@ -46,6 +55,14 @@ type BpfFlowRecordT struct {
Metrics BpfFlowMetrics
}
type BpfTcpDropsT struct {
Packets uint32
Bytes uint64
LatestFlags uint16
LatestState uint8
LatestDropCause uint32
}
// LoadBpf returns the embedded CollectionSpec for Bpf.
func LoadBpf() (*ebpf.CollectionSpec, error) {
reader := bytes.NewReader(_BpfBytes)
......@@ -89,6 +106,8 @@ type BpfSpecs struct {
type BpfProgramSpecs struct {
EgressFlowParse *ebpf.ProgramSpec `ebpf:"egress_flow_parse"`
IngressFlowParse *ebpf.ProgramSpec `ebpf:"ingress_flow_parse"`
KfreeSkb *ebpf.ProgramSpec `ebpf:"kfree_skb"`
TraceNetPackets *ebpf.ProgramSpec `ebpf:"trace_net_packets"`
}
// BpfMapSpecs contains maps before they are loaded into the kernel.
......@@ -135,12 +154,16 @@ func (m *BpfMaps) Close() error {
type BpfPrograms struct {
EgressFlowParse *ebpf.Program `ebpf:"egress_flow_parse"`
IngressFlowParse *ebpf.Program `ebpf:"ingress_flow_parse"`
KfreeSkb *ebpf.Program `ebpf:"kfree_skb"`
TraceNetPackets *ebpf.Program `ebpf:"trace_net_packets"`
}
func (p *BpfPrograms) Close() error {
return _BpfClose(
p.EgressFlowParse,
p.IngressFlowParse,
p.KfreeSkb,
p.TraceNetPackets,
)
}
......
No preview for this file type
......@@ -13,6 +13,13 @@ import (
"github.com/cilium/ebpf"
)
type BpfDnsRecordT struct {
Id uint16
Flags uint16
ReqMonoTimeTs uint64
RspMonoTimeTs uint64
}
type BpfFlowId BpfFlowIdT
type BpfFlowIdT struct {
......@@ -39,6 +46,8 @@ type BpfFlowMetricsT struct {
EndMonoTimeTs uint64
Flags uint16
Errno uint8
TcpDrops BpfTcpDropsT
DnsRecord BpfDnsRecordT
}
type BpfFlowRecordT struct {
......@@ -46,6 +55,14 @@ type BpfFlowRecordT struct {
Metrics BpfFlowMetrics
}
type BpfTcpDropsT struct {
Packets uint32
Bytes uint64
LatestFlags uint16
LatestState uint8
LatestDropCause uint32
}
// LoadBpf returns the embedded CollectionSpec for Bpf.
func LoadBpf() (*ebpf.CollectionSpec, error) {
reader := bytes.NewReader(_BpfBytes)
......@@ -89,6 +106,8 @@ type BpfSpecs struct {
type BpfProgramSpecs struct {
EgressFlowParse *ebpf.ProgramSpec `ebpf:"egress_flow_parse"`
IngressFlowParse *ebpf.ProgramSpec `ebpf:"ingress_flow_parse"`
KfreeSkb *ebpf.ProgramSpec `ebpf:"kfree_skb"`
TraceNetPackets *ebpf.ProgramSpec `ebpf:"trace_net_packets"`
}
// BpfMapSpecs contains maps before they are loaded into the kernel.
......@@ -135,12 +154,16 @@ func (m *BpfMaps) Close() error {
type BpfPrograms struct {
EgressFlowParse *ebpf.Program `ebpf:"egress_flow_parse"`
IngressFlowParse *ebpf.Program `ebpf:"ingress_flow_parse"`
KfreeSkb *ebpf.Program `ebpf:"kfree_skb"`
TraceNetPackets *ebpf.Program `ebpf:"trace_net_packets"`
}
func (p *BpfPrograms) Close() error {
return _BpfClose(
p.EgressFlowParse,
p.IngressFlowParse,
p.KfreeSkb,
p.TraceNetPackets,
)
}
......
No preview for this file type
......@@ -8,6 +8,7 @@ import (
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/btf"
"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/ringbuf"
"github.com/cilium/ebpf/rlimit"
"github.com/netobserv/netobserv-ebpf-agent/pkg/ifaces"
......@@ -17,7 +18,7 @@ import (
)
// $BPF_CLANG and $BPF_CFLAGS are set by the Makefile.
//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS -type flow_metrics_t -type flow_id_t -type flow_record_t Bpf ../../bpf/flows.c -- -I../../bpf/headers
//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS -type flow_metrics_t -type flow_id_t -type flow_record_t -type tcp_drops_t -type dns_record_t Bpf ../../bpf/flows.c -- -I../../bpf/headers
const (
qdiscType = "clsact"
......@@ -34,20 +35,22 @@ var log = logrus.WithField("component", "ebpf.FlowFetcher")
// and to flows that are forwarded by the kernel via ringbuffer because could not be aggregated
// in the map
type FlowFetcher struct {
objects *BpfObjects
qdiscs map[ifaces.Interface]*netlink.GenericQdisc
egressFilters map[ifaces.Interface]*netlink.BpfFilter
ingressFilters map[ifaces.Interface]*netlink.BpfFilter
ringbufReader *ringbuf.Reader
cacheMaxSize int
enableIngress bool
enableEgress bool
objects *BpfObjects
qdiscs map[ifaces.Interface]*netlink.GenericQdisc
egressFilters map[ifaces.Interface]*netlink.BpfFilter
ingressFilters map[ifaces.Interface]*netlink.BpfFilter
ringbufReader *ringbuf.Reader
cacheMaxSize int
enableIngress bool
enableEgress bool
tcpDropsTracePoint link.Link
dnsTrackerTracePoint link.Link
}
func NewFlowFetcher(
traceMessages bool,
sampling, cacheMaxSize int,
ingress, egress bool,
ingress, egress, tcpDrops, dnsTracker bool,
) (*FlowFetcher, error) {
if err := rlimit.RemoveMemlock(); err != nil {
log.WithError(err).
......@@ -73,6 +76,7 @@ func NewFlowFetcher(
}); err != nil {
return nil, fmt.Errorf("rewriting BPF constants definition: %w", err)
}
if err := spec.LoadAndAssign(&objects, nil); err != nil {
var ve *ebpf.VerifierError
if errors.As(err, &ve) {
......@@ -88,20 +92,39 @@ func NewFlowFetcher(
* for more details.
*/
btf.FlushKernelSpec()
var tcpDropsLink link.Link
if tcpDrops {
tcpDropsLink, err = link.Tracepoint("skb", "kfree_skb", objects.KfreeSkb, nil)
if err != nil {
return nil, fmt.Errorf("failed to attach the BPF program to kfree_skb tracepoint: %w", err)
}
}
var dnsTrackerLink link.Link
if dnsTracker {
dnsTrackerLink, err = link.Tracepoint("net", "net_dev_queue", objects.TraceNetPackets, nil)
if err != nil {
return nil, fmt.Errorf("failed to attach the BPF program to trace_net_packets: %w", err)
}
}
// read events from igress+egress ringbuffer
flows, err := ringbuf.NewReader(objects.DirectFlows)
if err != nil {
return nil, fmt.Errorf("accessing to ringbuffer: %w", err)
}
return &FlowFetcher{
objects: &objects,
ringbufReader: flows,
egressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{},
cacheMaxSize: cacheMaxSize,
enableIngress: ingress,
enableEgress: egress,
objects: &objects,
ringbufReader: flows,
egressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{},
qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{},
cacheMaxSize: cacheMaxSize,
enableIngress: ingress,
enableEgress: egress,
tcpDropsTracePoint: tcpDropsLink,
dnsTrackerTracePoint: dnsTrackerLink,
}, nil
}
......@@ -217,6 +240,14 @@ func (m *FlowFetcher) Close() error {
log.Debug("unregistering eBPF objects")
var errs []error
if m.tcpDropsTracePoint != nil {
m.tcpDropsTracePoint.Close()
}
if m.dnsTrackerTracePoint != nil {
m.dnsTrackerTracePoint.Close()
}
// m.ringbufReader.Read is a blocking operation, so we need to close the ring buffer
// from another goroutine to avoid the system not being able to exit if there
// isn't traffic in a given interface
......
......@@ -65,7 +65,7 @@ func TestProtoConversion(t *testing.T) {
assert.EqualValues(t, 4321, r.Transport.SrcPort)
assert.EqualValues(t, 1234, r.Transport.DstPort)
assert.EqualValues(t, 210, r.Transport.Protocol)
assert.EqualValues(t, 8, r.Icmp.IcmpType)
assert.EqualValues(t, 8, r.IcmpType)
assert.Equal(t, record.TimeFlowStart.UnixMilli(), r.TimeFlowStart.AsTime().UnixMilli())
assert.Equal(t, record.TimeFlowEnd.UnixMilli(), r.TimeFlowEnd.AsTime().UnixMilli())
assert.EqualValues(t, 789, r.Bytes)
......
......@@ -38,7 +38,7 @@ func flowToPB(record *flow.Record) *pbflow.Record {
}
func v4FlowToPB(fr *flow.Record) *pbflow.Record {
return &pbflow.Record{
var pbflowRecord = pbflow.Record{
EthProtocol: uint32(fr.Id.EthProtocol),
Direction: pbflow.Direction(fr.Id.Direction),
DataLink: &pbflow.DataLink{
......@@ -54,11 +54,9 @@ func v4FlowToPB(fr *flow.Record) *pbflow.Record {
SrcPort: uint32(fr.Id.SrcPort),
DstPort: uint32(fr.Id.DstPort),
},
Icmp: &pbflow.Icmp{
IcmpType: uint32(fr.Id.IcmpType),
IcmpCode: uint32(fr.Id.IcmpCode),
},
Bytes: fr.Metrics.Bytes,
IcmpType: uint32(fr.Id.IcmpType),
IcmpCode: uint32(fr.Id.IcmpCode),
Bytes: fr.Metrics.Bytes,
TimeFlowStart: &timestamppb.Timestamp{
Seconds: fr.TimeFlowStart.Unix(),
Nanos: int32(fr.TimeFlowStart.Nanosecond()),
......@@ -67,16 +65,36 @@ func v4FlowToPB(fr *flow.Record) *pbflow.Record {
Seconds: fr.TimeFlowEnd.Unix(),
Nanos: int32(fr.TimeFlowEnd.Nanosecond()),
},
Packets: uint64(fr.Metrics.Packets),
Duplicate: fr.Duplicate,
AgentIp: agentIP(fr.AgentIP),
Flags: uint32(fr.Metrics.Flags),
Interface: string(fr.Interface),
Packets: uint64(fr.Metrics.Packets),
Duplicate: fr.Duplicate,
AgentIp: agentIP(fr.AgentIP),
Flags: uint32(fr.Metrics.Flags),
Interface: string(fr.Interface),
TcpDropBytes: fr.Metrics.TcpDrops.Bytes,
TcpDropPackets: uint64(fr.Metrics.TcpDrops.Packets),
TcpDropLatestFlags: uint32(fr.Metrics.TcpDrops.LatestFlags),
TcpDropLatestState: uint32(fr.Metrics.TcpDrops.LatestState),
TcpDropLatestDropCause: fr.Metrics.TcpDrops.LatestDropCause,
DnsId: uint32(fr.Metrics.DnsRecord.Id),
DnsFlags: uint32(fr.Metrics.DnsRecord.Flags),
}
if fr.Metrics.DnsRecord.ReqMonoTimeTs != 0 {
pbflowRecord.TimeDnsReq = &timestamppb.Timestamp{
Seconds: fr.TimeDNSRequest.Unix(),
Nanos: int32(fr.TimeDNSRequest.Nanosecond()),
}
}
if fr.Metrics.DnsRecord.RspMonoTimeTs != 0 {
pbflowRecord.TimeDnsRsp = &timestamppb.Timestamp{
Seconds: fr.TimeDNSResponse.Unix(),
Nanos: int32(fr.TimeDNSResponse.Nanosecond()),
}
}
return &pbflowRecord
}
func v6FlowToPB(fr *flow.Record) *pbflow.Record {
return &pbflow.Record{
var pbflowRecord = pbflow.Record{
EthProtocol: uint32(fr.Id.EthProtocol),
Direction: pbflow.Direction(fr.Id.Direction),
DataLink: &pbflow.DataLink{
......@@ -92,11 +110,9 @@ func v6FlowToPB(fr *flow.Record) *pbflow.Record {
SrcPort: uint32(fr.Id.SrcPort),
DstPort: uint32(fr.Id.DstPort),
},
Icmp: &pbflow.Icmp{
IcmpType: uint32(fr.Id.IcmpType),
IcmpCode: uint32(fr.Id.IcmpCode),
},
Bytes: fr.Metrics.Bytes,
IcmpType: uint32(fr.Id.IcmpType),
IcmpCode: uint32(fr.Id.IcmpCode),
Bytes: fr.Metrics.Bytes,
TimeFlowStart: &timestamppb.Timestamp{
Seconds: fr.TimeFlowStart.Unix(),
Nanos: int32(fr.TimeFlowStart.Nanosecond()),
......@@ -105,12 +121,32 @@ func v6FlowToPB(fr *flow.Record) *pbflow.Record {
Seconds: fr.TimeFlowEnd.Unix(),
Nanos: int32(fr.TimeFlowEnd.Nanosecond()),
},
Packets: uint64(fr.Metrics.Packets),
Flags: uint32(fr.Metrics.Flags),
Interface: fr.Interface,
Duplicate: fr.Duplicate,
AgentIp: agentIP(fr.AgentIP),
Packets: uint64(fr.Metrics.Packets),
Flags: uint32(fr.Metrics.Flags),
Interface: fr.Interface,
Duplicate: fr.Duplicate,
AgentIp: agentIP(fr.AgentIP),
TcpDropBytes: fr.Metrics.TcpDrops.Bytes,
TcpDropPackets: uint64(fr.Metrics.TcpDrops.Packets),
TcpDropLatestFlags: uint32(fr.Metrics.TcpDrops.LatestFlags),
TcpDropLatestState: uint32(fr.Metrics.TcpDrops.LatestState),
TcpDropLatestDropCause: fr.Metrics.TcpDrops.LatestDropCause,
DnsId: uint32(fr.Metrics.DnsRecord.Id),
DnsFlags: uint32(fr.Metrics.DnsRecord.Flags),
}
if fr.Metrics.DnsRecord.ReqMonoTimeTs != 0 {
pbflowRecord.TimeDnsReq = &timestamppb.Timestamp{
Seconds: fr.TimeDNSRequest.Unix(),
Nanos: int32(fr.TimeDNSRequest.Nanosecond()),
}
}
if fr.Metrics.DnsRecord.RspMonoTimeTs != 0 {
pbflowRecord.TimeDnsRsp = &timestamppb.Timestamp{
Seconds: fr.TimeDNSResponse.Unix(),
Nanos: int32(fr.TimeDNSResponse.Nanosecond()),
}
}
return &pbflowRecord
}
// Mac bytes are encoded in the same order as in the array. This is, a Mac
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment