Skip to content
Snippets Groups Projects
Unverified Commit 60194d77 authored by Mohamed S. Mahmoud's avatar Mohamed S. Mahmoud Committed by GitHub
Browse files

NETOBSERV-1478: Add eBPF Agent filtering capability (#307)


* ebpf changes to support rule based flow filtering feature

Signed-off-by: default avatarMohamed Mahmoud <mmahmoud@redhat.com>

* flow filter feature userspace code

Signed-off-by: default avatarMohamed Mahmoud <mmahmoud@redhat.com>

* add unit test for userspace flow filter module

Signed-off-by: default avatarMohamed Mahmoud <mmahmoud@redhat.com>

* add flow filter doc and show some use cases

Signed-off-by: default avatarMohamed Mahmoud <mmahmoud@redhat.com>

* create specific metics for flow filtering instead of using drop flows metric

Signed-off-by: default avatarMohamed Mahmoud <mmahmoud@redhat.com>

---------

Signed-off-by: default avatarMohamed Mahmoud <mmahmoud@redhat.com>
parent f9fa0a7d
Branches
Tags
No related merge requests found
Showing
with 1065 additions and 25 deletions
...@@ -9,4 +9,5 @@ volatile const u8 enable_rtt = 0; ...@@ -9,4 +9,5 @@ volatile const u8 enable_rtt = 0;
volatile const u16 pca_port = 0; volatile const u16 pca_port = 0;
volatile const u8 pca_proto = 0; volatile const u8 pca_proto = 0;
volatile const u8 enable_dns_tracking = 0; volatile const u8 enable_dns_tracking = 0;
volatile const u8 enable_flows_filtering = 0;
#endif //__CONFIGS_H__ #endif //__CONFIGS_H__
...@@ -39,7 +39,11 @@ ...@@ -39,7 +39,11 @@
*/ */
#include "pca.h" #include "pca.h"
/* Do flow filtering. Is optional. */
#include "flows_filter.h"
static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
filter_action action = ACCEPT;
// If sampling is defined, will only parse 1 out of "sampling" flows // If sampling is defined, will only parse 1 out of "sampling" flows
if (sampling != 0 && (bpf_get_prandom_u32() % sampling) != 0) { if (sampling != 0 && (bpf_get_prandom_u32() % sampling) != 0) {
return TC_ACT_OK; return TC_ACT_OK;
...@@ -66,6 +70,57 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { ...@@ -66,6 +70,57 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
id.if_index = skb->ifindex; id.if_index = skb->ifindex;
id.direction = direction; id.direction = direction;
// check if this packet need to be filtered if filtering feature is enabled
if (enable_flows_filtering) {
u32* filter_counter_p = NULL;
u32 initVal = 1, key = 0;
if (is_flow_filtered(&id, &action) != 0 && action != MAX_FILTER_ACTIONS) {
// we have matching rules follow through the actions to decide if we should accept or reject the flow
// and update global counter for both cases
u32 reject_key = FILTER_FLOWS_REJECT_KEY, accept_key = FILTER_FLOWS_ACCEPT_KEY;
bool skip = false;
switch (action) {
case REJECT:
key = reject_key;
skip = true;
break;
case ACCEPT:
key = accept_key;
break;
// should never come here
case MAX_FILTER_ACTIONS:
return TC_ACT_OK;
}
// update global counter for flows dropped by filter
filter_counter_p = bpf_map_lookup_elem(&global_counters, &key);
if (!filter_counter_p) {
bpf_map_update_elem(&global_counters, &key, &initVal, BPF_ANY);
} else {
__sync_fetch_and_add(filter_counter_p, 1);
}
if (skip) {
return TC_ACT_OK;
}
} else {
// we have no matching rules so we update global counter for flows that are not matched by any rule
key = FILTER_FLOWS_NOMATCH_KEY;
filter_counter_p = bpf_map_lookup_elem(&global_counters, &key);
if (!filter_counter_p) {
bpf_map_update_elem(&global_counters, &key, &initVal, BPF_ANY);
} else {
__sync_fetch_and_add(filter_counter_p, 1);
}
// we have accept rule but no match so we can't let mismatched flows in the hashmap table.
if (action == ACCEPT || action == MAX_FILTER_ACTIONS) {
return TC_ACT_OK;
} else {
// we have reject rule and no match so we can add the flows to the hashmap table.
}
}
}
int dns_errno = 0; int dns_errno = 0;
if (enable_dns_tracking) { if (enable_dns_tracking) {
dns_errno = track_dns_packet(skb, &pkt); dns_errno = track_dns_packet(skb, &pkt);
......
/*
rule based filter to filter out packets not of interest to users.
*/
#ifndef __FLOWS_FILTER_H__
#define __FLOWS_FILTER_H__
#include "utils.h"
// remove the comment below to enable debug prints
//#define ENABLE_BPF_PRINTK
#ifdef ENABLE_BPF_PRINTK
#define BPF_PRINTK(fmt, args...) bpf_printk(fmt, ##args)
#else
#define BPF_PRINTK(fmt, args...)
#endif
static __always_inline int is_zero_ip(u8 *ip, u8 len) {
for (int i = 0; i < len; i++) {
if (ip[i] != 0) {
BPF_PRINTK("ip not zero ip[%d]:%d\n", i, ip[i]);
return 0;
}
}
return 1;
}
static __always_inline int is_equal_ip(u8 *ip1, u8 *ip2, u8 len) {
for (int i = 0; i < len; i++) {
if (ip1[i] != ip2[i]) {
BPF_PRINTK("ip mismatched ip1[%d]:%d not equal to ip2[%d]:%d\n", i, ip1[i], i, ip2[i]);
return 0;
}
}
return 1;
}
static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_t *key, filter_action *action,
u8 len, u8 offset) {
int result = 0;
struct filter_value_t *rule = (struct filter_value_t *)bpf_map_lookup_elem(&filter_map, key);
if (rule) {
BPF_PRINTK("rule found\n");
result++;
if (rule->action != MAX_FILTER_ACTIONS) {
BPF_PRINTK("action matched: %d\n", rule->action);
*action = rule->action;
result++;
}
if (rule->protocol != 0) {
if (rule->protocol == id->transport_protocol) {
BPF_PRINTK("protocol matched\n");
result++;
switch (rule->protocol) {
case IPPROTO_TCP:
case IPPROTO_UDP:
case IPPROTO_SCTP:
// dstPort matching
if (rule->dstPortStart != 0 && rule->dstPortEnd == 0) {
if (rule->dstPortStart == id->dst_port) {
BPF_PRINTK("dstPortStart matched\n");
result++;
} else {
result = 0;
goto end;
}
} else if (rule->dstPortStart != 0 && rule->dstPortEnd != 0) {
if (rule->dstPortStart <= id->dst_port && id->dst_port <= rule->dstPortEnd) {
BPF_PRINTK("dstPortStart and dstPortEnd matched\n");
result++;
} else {
result = 0;
goto end;
}
}
// srcPort matching
if (rule->srcPortStart != 0 && rule->srcPortEnd == 0) {
if (rule->srcPortStart == id->src_port) {
BPF_PRINTK("srcPortStart matched\n");
result++;
} else {
result = 0;
goto end;
}
} else if (rule->srcPortStart != 0 && rule->srcPortEnd != 0) {
if (rule->srcPortStart <= id->src_port && id->src_port <= rule->srcPortEnd) {
BPF_PRINTK("srcPortStart and srcPortEnd matched\n");
result++;
} else {
result = 0;
goto end;
}
}
// Generic port matching check for either src or dst port
if (rule->portStart != 0 && rule->portEnd == 0) {
if (rule->portStart == id->src_port || rule->portStart == id->dst_port) {
BPF_PRINTK("portStart matched\n");
result++;
} else {
result = 0;
goto end;
}
} else if (rule->portStart != 0 && rule->portEnd != 0) {
if ((rule->portStart <= id->src_port && id->src_port <= rule->portEnd) ||
(rule->portStart <= id->dst_port && id->dst_port <= rule->portEnd)) {
BPF_PRINTK("portStart and portEnd matched\n");
result++;
} else {
result = 0;
goto end;
}
}
break;
case IPPROTO_ICMP:
case IPPROTO_ICMPV6:
if (rule->icmpType != 0) {
if (rule->icmpType == id->icmp_type) {
BPF_PRINTK("icmpType matched\n");
result++;
} else {
result = 0;
goto end;
}
if (rule->icmpCode != 0) {
if (rule->icmpCode == id->icmp_code) {
BPF_PRINTK("icmpCode matched\n");
result++;
} else {
result = 0;
goto end;
}
}
}
break;
}
} else {
result = 0;
goto end;
}
}
if (!is_zero_ip(rule->ip, len)) {
// for Ingress side we can filter using dstIP and for Egress side we can filter using srcIP
if (id->direction == INGRESS) {
if (is_equal_ip(rule->ip, id->dst_ip + offset, len)) {
BPF_PRINTK("dstIP matched\n");
result++;
} else {
result = 0;
goto end;
}
} else {
if (is_equal_ip(rule->ip, id->src_ip + offset, len)) {
BPF_PRINTK("srcIP matched\n");
result++;
} else {
result = 0;
goto end;
}
}
}
if (rule->direction != MAX_DIRECTION) {
if (rule->direction == id->direction) {
BPF_PRINTK("direction matched\n");
result++;
} else {
result = 0;
goto end;
}
}
}
end:
BPF_PRINTK("result: %d action %d\n", result, *action);
return result;
}
static __always_inline int flow_filter_setup_lookup_key(flow_id *id, struct filter_key_t *key,
u8 *len, u8 *offset, bool use_src_ip) {
if (id->eth_protocol == ETH_P_IP) {
*len = sizeof(u32);
*offset = sizeof(ip4in6);
if (use_src_ip) {
__builtin_memcpy(key->ip_data, id->src_ip + *offset, *len);
} else {
__builtin_memcpy(key->ip_data, id->dst_ip + *offset, *len);
}
key->prefix_len = 32;
} else if (id->eth_protocol == ETH_P_IPV6) {
*len = IP_MAX_LEN;
*offset = 0;
if (use_src_ip) {
__builtin_memcpy(key->ip_data, id->src_ip + *offset, *len);
} else {
__builtin_memcpy(key->ip_data, id->dst_ip + *offset, *len);
}
key->prefix_len = 128;
} else {
return -1;
}
return 0;
}
/*
* check if the flow match filter rule and return >= 1 if the flow is to be dropped
*/
static __always_inline int is_flow_filtered(flow_id *id, filter_action *action) {
struct filter_key_t key;
u8 len, offset;
int result = 0;
__builtin_memset(&key, 0, sizeof(key));
*action = MAX_FILTER_ACTIONS;
// Lets do first CIDR match using srcIP.
result = flow_filter_setup_lookup_key(id, &key, &len, &offset, true);
if (result < 0) {
return result;
}
result = do_flow_filter_lookup(id, &key, action, len, offset);
// we have a match so return
if (result > 0) {
return result;
}
// if we can't find a match then Lets do second CIDR match using dstIP.
result = flow_filter_setup_lookup_key(id, &key, &len, &offset, false);
if (result < 0) {
return result;
}
return do_flow_filter_lookup(id, &key, action, len, offset);
}
#endif //__FLOWS_FILTER_H__
...@@ -41,7 +41,16 @@ struct { ...@@ -41,7 +41,16 @@ struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, u32); __type(key, u32);
__type(value, u32); __type(value, u32);
__uint(max_entries, 1); __uint(max_entries, MAX_DROPPED_FLOWS_KEY);
} global_counters SEC(".maps"); } global_counters SEC(".maps");
// LPM trie map used to filter traffic by IP address CIDR and direction
struct {
__uint(type, BPF_MAP_TYPE_LPM_TRIE);
__type(key, struct filter_key_t);
__type(value, struct filter_value_t);
__uint(max_entries, MAX_FILTER_ENTRIES);
__uint(map_flags, BPF_F_NO_PREALLOC);
} filter_map SEC(".maps");
#endif //__MAPS_DEFINITION_H__ #endif //__MAPS_DEFINITION_H__
...@@ -49,7 +49,7 @@ static inline int trace_pkt_drop(void *ctx, u8 state, ...@@ -49,7 +49,7 @@ static inline int trace_pkt_drop(void *ctx, u8 state,
} }
long ret = 0; long ret = 0;
for (direction_t dir = INGRESS; dir < MAX_DIRECTION; dir++) { for (direction dir = INGRESS; dir < MAX_DIRECTION; dir++) {
id.direction = dir; id.direction = dir;
ret = pkt_drop_lookup_and_update_flow(skb, &id, state, flags, reason); ret = pkt_drop_lookup_and_update_flow(skb, &id, state, flags, reason);
if (ret == 0) { if (ret == 0) {
......
...@@ -54,12 +54,17 @@ typedef __u64 u64; ...@@ -54,12 +54,17 @@ typedef __u64 u64;
#define DSCP_SHIFT 2 #define DSCP_SHIFT 2
#define DSCP_MASK 0x3F #define DSCP_MASK 0x3F
#define MIN_RTT 10000u //10us #define MIN_RTT 10000u //10us
#define MAX_FILTER_ENTRIES 1 // we have only one global filter
// according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml // according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml
typedef enum { typedef enum direction_t {
INGRESS = 0, INGRESS = 0,
EGRESS = 1, EGRESS = 1,
MAX_DIRECTION = 2, MAX_DIRECTION = 2,
} direction_t; } direction;
// Force emitting enum direction_t into the ELF.
const enum direction_t *unused8 __attribute__((unused));
const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}; const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff};
...@@ -126,18 +131,6 @@ typedef struct flow_id_t { ...@@ -126,18 +131,6 @@ typedef struct flow_id_t {
// Force emitting struct flow_id into the ELF. // Force emitting struct flow_id into the ELF.
const struct flow_id_t *unused2 __attribute__((unused)); const struct flow_id_t *unused2 __attribute__((unused));
// Standard 4 tuple, transport protocol and a sequence identifier.
// No need to emit this struct. It's used only in kernel space
typedef struct flow_seq_id_t {
u16 src_port;
u16 dst_port;
u8 src_ip[IP_MAX_LEN];
u8 dst_ip[IP_MAX_LEN];
u32 seq_id;
u8 transport_protocol;
u32 if_index; // OS interface index
} __attribute__((packed)) flow_seq_id;
// Flow record is a tuple containing both flow identifier and metrics. It is used to send // Flow record is a tuple containing both flow identifier and metrics. It is used to send
// a complete flow via ring buffer when only when the accounting hashmap is full. // a complete flow via ring buffer when only when the accounting hashmap is full.
// Contents in this struct must match byte-by-byte with Go's pkc/flow/Record struct // Contents in this struct must match byte-by-byte with Go's pkc/flow/Record struct
...@@ -183,11 +176,51 @@ typedef struct dns_flow_id_t { ...@@ -183,11 +176,51 @@ typedef struct dns_flow_id_t {
// Enum to define global counters keys and share it with userspace // Enum to define global counters keys and share it with userspace
typedef enum global_counters_key_t { typedef enum global_counters_key_t {
HASHMAP_FLOWS_DROPPED_KEY = 0, HASHMAP_FLOWS_DROPPED_KEY = 0,
FILTER_FLOWS_REJECT_KEY = 1,
FILTER_FLOWS_ACCEPT_KEY = 2,
FILTER_FLOWS_NOMATCH_KEY = 3,
MAX_DROPPED_FLOWS_KEY = 4,
} global_counters_key; } global_counters_key;
// Force emitting enum global_counters_key_t into the ELF. // Force emitting enum global_counters_key_t into the ELF.
const enum global_counters_key_t *unused5 __attribute__((unused)); const enum global_counters_key_t *unused5 __attribute__((unused));
// filter key used as key to LPM map to filter out flows that are not interesting for the user
struct filter_key_t {
u32 prefix_len;
u8 ip_data[IP_MAX_LEN];
} __attribute__((packed));
// Force emitting struct filter_key_t into the ELF.
const struct filter_key_t *unused6 __attribute__((unused));
// Enum to define filter action
typedef enum filter_action_t {
ACCEPT = 0,
REJECT = 1,
MAX_FILTER_ACTIONS = 2,
} filter_action;
// Force emitting enum direction_t into the ELF.
const enum filter_action_t *unused7 __attribute__((unused));
// filter value used as value from LPM map lookup to filter out flows that are not interesting for the user
struct filter_value_t {
u8 protocol;
u16 dstPortStart;
u16 dstPortEnd;
u16 srcPortStart;
u16 srcPortEnd;
u16 portStart;
u16 portEnd;
u8 icmpType;
u8 icmpCode;
direction direction;
filter_action action;
u8 ip[IP_MAX_LEN];
} __attribute__((packed));
// Force emitting struct filter_value_t into the ELF.
const struct filter_value_t *unused9 __attribute__((unused));
#endif /* __TYPES_H__ */ #endif /* __TYPES_H__ */
...@@ -71,11 +71,34 @@ The following environment variables are available to configure the NetObserv eBF ...@@ -71,11 +71,34 @@ The following environment variables are available to configure the NetObserv eBF
If it is not set, profile is disabled. If it is not set, profile is disabled.
* `ENABLE_RTT` (default: `false` disabled). If `true` enables RTT calculations for the captured flows in the ebpf agent. * `ENABLE_RTT` (default: `false` disabled). If `true` enables RTT calculations for the captured flows in the ebpf agent.
See [docs](./rtt_calculations.md) for more details on this feature. See [docs](./rtt_calculations.md) for more details on this feature.
* `ENABLE_PKT_DROPS` (default: `false` disabled). If `true` enables packet drops eBPF hook to be able to capture drops flows in the ebpf agent.
* `ENABLE_DNS_TRACKING` (default: `false` disabled). If `true` enables DNS tracking to calculate DNS latency for the captured flows in the ebpf agent.
* `ENABLE_PCA` (default: `false` disabled). If `true` enables Packet Capture Agent. * `ENABLE_PCA` (default: `false` disabled). If `true` enables Packet Capture Agent.
* `PCA_FILTER` (default: `none`). Works only when `ENABLE_PCA` is set. Accepted format <protocol,portnumber>. Example * `PCA_FILTER` (default: `none`). Works only when `ENABLE_PCA` is set. Accepted format <protocol,portnumber>. Example
`PCA_FILTER=tcp,22`. `PCA_FILTER=tcp,22`.
* `PCA_SERVER_PORT` (default: 0). Works only when `ENABLE_PCA` is set. Agent opens PCA Server at this port. A collector can connect to it and recieve filtered packets as pcap stream. The filter is set using `PCA_FILTER`. * `PCA_SERVER_PORT` (default: 0). Works only when `ENABLE_PCA` is set. Agent opens PCA Server at this port. A collector can connect to it and recieve filtered packets as pcap stream. The filter is set using `PCA_FILTER`.
* `FLP_CONFIG`: [flowlogs-pipeline](https://github.com/netobserv/flowlogs-pipeline) configuration as YAML or JSON, used when `EXPORT` is `direct-flp`. The ingest stage must be omitted from this configuration, since it is handled internally by the agent. The first stage should follow "preset-ingester". E.g, for a minimal configuration printing on terminal: `{"pipeline":[{"name": "writer","follows": "preset-ingester"}],"parameters":[{"name": "writer","write": {"type": "stdout"}}]}`. Refer to flowlogs-pipeline documentation for more options. * `FLP_CONFIG`: [flowlogs-pipeline](https://github.com/netobserv/flowlogs-pipeline) configuration as YAML or JSON, used when `EXPORT` is `direct-flp`. The ingest stage must be omitted from this configuration, since it is handled internally by the agent. The first stage should follow "preset-ingester". E.g, for a minimal configuration printing on terminal: `{"pipeline":[{"name": "writer","follows": "preset-ingester"}],"parameters":[{"name": "writer","write": {"type": "stdout"}}]}`. Refer to flowlogs-pipeline documentation for more options.
* `METRICS_ENABLED` (default: `false`). If `true`, the agent will export metrics to the configured `EXPORT` endpoint.
* `METRICS_SERVER_ADDRESS` Address of the server where the metrics will be exported.
* `METRICS_SERVER_PORT` (default: 9090). Port of the server where the metrics will be exported.
* `METRICS_TLS_CERT_PATH` (default: unset). Path to the certificate file for the TLS connection.
* `METRICS_TLS_KEY_PATH` (default: unset). Path to the private key file for the TLS connection.
* `METRICS_PREFIX` (default: `ebpf-agent`). Prefix for the exported metrics.
* `ENABLE_FLOW_FILTER` (default: `false`). If `true`, the agent will filter flows based on the configured `FLOW_FILTER`.
See [docs](./flow_filtering.md) for more details on this feature.
* `FLOW_FILTER_DIRECTION` (default: unset). Direction of the flows to be filtered. Accepted values are `ingress`, `egress`, this is optional configuration.
* `FLOW_FILTER_IP_CIDR` (default: unset). IP CIDR to be filtered. Accepted format: `192.168.1.0/24` this field is mandatory.
* `FLOW_FILTER_PROTOCOL` (default: unset). Protocol to be filtered. Accepted values are `TCP`, `UDP`, `SCTP`, `ICMP`, `ICMPv6`, this is optional configuration.
* `FLOW_FILTER_SOURCE_PORT` (default: unset). Source port to be filtered. Accepted format: `80` this field is optional.
* `FLOW_FILTER_DESTINATION_PORT` (default: unset). Destination port to be filtered. Accepted format: `80` this field is optional.
* `FLOW_FILTER_PORT` (default: unset). Port to be filtered can be either source or dst port. Accepted format: `80` this field is optional.
* `FLOW_FILTER_SOURCE_PORT_RANGE` (default: unset). Source port range to be filtered. Accepted format: `80-90` this field is optional.
* `FLOW_FILTER_DESTINATION_PORT_RANGE` (default: unset). Destination port range to be filtered. Accepted format: `80-90` this field is optional.
* `FLOW_FILTER_PORT_RANGE` (default: unset). Port range to be filtered can be either source or dst port. Accepted format: `80-90` this field
* `FLOW_FILTER_ICMP_TYPE` (default: unset). ICMP type to be filtered. Accepted format: `8` this field is optional.
* `FLOW_FILTER_ICMP_CODE` (default: unset). ICMP code to be filtered. Accepted format: `8` this field is optional.
* `FLOW_FILTER_PEER_IP` (default: unset). Peer IP address to be filtered. Accepted format: `192.168.1.1` this field is optional.
* `FLOW_FILTER_ACTION` (default: unset). Action to be taken when the flow is filtered. Accepted values are `Accept`, `Reject`.
## Development-only variables ## Development-only variables
......
# eBPF Flow Rule Based Filtering
## Introduction
Flow rule-base filtering is a method to control the flow of packets cached in the eBPF flows table based on certain configuration
## Flow filter rule configuration
The Flow filter rule consists of two parts mandatory and optional parameters.
### Mandatory parameters
- `FLOW_FILTER_IP_CIDR` - IP address and CIDR mask for the flow filter rule, supports IPv4 and IPv6 address format.
If wanted to match against any IP, user can use `0.0.0.0/0` or `::/0` for IPv4 and IPv6 respectively.
- `FLOW_FILTER_ACTION` - Action to be taken for the flow filter rule. Possible values are `Accept` and `Reject`.
- For the matching rule with `Accept` action this flow will be allowed to be cached in eBPF table, with updated global metric `FlowFilterAcceptCounter`.
- For the matching rule with `Reject` action this flow will not be cached in eBPF table, with updated global metric `FlowFilterRejectCounter`.
- If the rule is not matched, based on the configured action if its `Accept` the flow will not be cached in eBPF table,
if the action is `Reject` then the flow will be cached in the eBPF table and a global metric `FlowFilterNoMatchCounter` will be updated.
### Optional parameters
- `FLOW_FILTER_DIRECTION` - Direction of the flow filter rule. Possible values are `Ingress` and `Egress`.
- `FLOW_FILTER_PROTOCOL` - Protocol of the flow filter rule. Possible values are `TCP`, `UDP`, `SCTP`, `ICMP`, `ICMPv6`.
- `FLOW_FILTER_SOURCE_PORT` - Single Source port of the flow filter rule.
- `FLOW_FILTER_SOURCE_PORT_RANGE` - Source port range of the flow filter rule. using "80-100" format.
- `FLOW_FILTER_DESTINATION_PORT` - Single Destination port of the flow filter rule.
- `FLOW_FILTER_DESTINATION_PORT_RANGE` - Destination port range of the flow filter rule. using "80-100" format.
- `FLOW_FILTER_PORT` - Single L4 port of the flow filter rule, can be either source or destination port.
- `FLOW_FILTER_PORT_RANGE` - L4 port range of the flow filter rule. using "80-100" format can be either source or destination ports range.
- `FLOW_FILTER_ICMP_TYPE` - ICMP type of the flow filter rule.
- `FLOW_FILTER_ICMP_CODE` - ICMP code of the flow filter rule.
- `FLOW_FILTER_PEER_IP` - Specific Peer IP address of the flow filter rule.
Note:
- for L4 ports configuration you can use either single port config options or the range but not both.
- use either specific src and/or dst ports or the generic port config that works for both direction.
## How does Flow Filtering work
### Flow Filter and CIDR Matching
The flow filter examines incoming or outgoing packets and attempts to match the source IP address or the destination IP address
of each packet against a CIDR range specified in the `FLOW_FILTER_IP_CIDR` parameter.
If the packet's source or destination IP address falls within the specified CIDR range, the filter takes action based on the configured rules.
This action could involve allowing the packet to be cached in an eBPF flow table or blocking it.
### Matching Specific Endpoints with `FLOW_FILTER_PEER_IP`
The `FLOW_FILTER_PEER_IP` parameter specifies the IP address of a specific endpoint.
Depending on whether the traffic is ingress (incoming) or egress (outgoing), this IP address is used to further refine
the filtering process:
- In ingress traffic filtering, the `FLOW_FILTER_PEER_IP` is used to match against the destination IP address of the packet.
After the initial CIDR matching, the filter then narrows down the scope to packets destined for a specific endpoint
specified by `FLOW_FILTER_PEER_IP`.
- In egress traffic filtering, the `FLOW_FILTER_PEER_IP` is used to match against the source IP address of the packet.
After the initial CIDR matching, the filter narrows down the scope to packets originating from a specific endpoint
specified by `FLOW_FILTER_PEER_IP`.
### How to fine tune the flow filter rule configuration?
We have many configuration options available for the flow filter rule configuration, but we can use them in combination to achieve the desired
flow filter rule configuration. Let's use some examples to understand how to fine tune the flow filter rule configuration.
#### Use-case 1:
Filter k8s service traffic to specific POD IP endpoint.
For example if we wanted to filter in incoming k8s service traffic coming from source `172.210.150.100` for `SCTP` protocol,
on specific dport range 80-100, and targeting specific POD IP endpoint at `10.10.10.10` we can use the following configuration:
```shell
FLOW_FILTER_IP_CIDR=172.210.150.1/24
FLOW_FILTER_ACTION=Accept
FLOW_FILTER_PROTOCOL=SCTP
FLOW_FILTER_DIRECTION=Ingress
FLOW_FILTER_DESTINATION_PORT_RANGE=80-100
FLOW_FILTER_PEER_IP=10.10.10.10
```
#### Use-case 2:
Users wanted to see flows after EgressIP feature is configured with EgressIP `192.168.127.12` for `TCP` protocol with sport `100`
to any cluster's outside addresses (destinations is unknown or don't care), so they can use the following configuration:
```shell
FLOW_FILTER_IP_CIDR=0.0.0.0/0
FLOW_FILTER_ACTION=Accept
FLOW_FILTER_PROTOCOL=TCP
FLOW_FILTER_DIRECTION=Egress
FLOW_FILTER_SOURCE_PORT=100
FLOW_FILTER_PEER_IP=192.168.127.12
```
#### Use-case 3:
OpenShift ovn kubernetes CNI uses `169.254.169.1-4` as masquerade addresses when handle host service traffic
I am not interested in capturing any those packets, so I can use the following configuration:
```shell
FLOW_FILTER_IP_CIDR=169.254.169.1/24
FLOW_FILTER_ACTION=Reject
FLOW_FILTER_DIRECTION=Ingress
```
#### Use-case 4:
We have case where ping traffic is going between PODA `1.1.1.10` to PODB in different node `1.2.1.10` for that we can use the following configuration:
```shell
FLOW_FILTER_IP_CIDR=1.1.1.10/32
FLOW_FILTER_ACTION=Accept
FLOW_FILTER_DIRECTION=Ingress
FLOW_FILTER_PROTOCOL=ICMP
FLOW_FILTER_PEER_IP=1.2.1.10
FLOW_FILTER_ICMP_TYPE=8
```
#### Use-case 5:
We wanted to filter in `curl` request and response for TCP flow going from PODA `1.1.1.10` to PODB in different node `1.2.1.10` using port `80`
for that we can use the following configuration:
```shell
FLOW_FILTER_IP_CIDR=1.1.1.10/32
FLOW_FILTER_ACTION=Accept
FLOW_FILTER_PROTOCOL=TCP
FLOW_FILTER_PORT=80
FLOW_FILTER_PEER_IP=1.2.1.10
```
...@@ -177,14 +177,25 @@ func FlowsAgent(cfg *Config) (*Flows, error) { ...@@ -177,14 +177,25 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
} }
ebpfConfig := &ebpf.FlowFetcherConfig{ ebpfConfig := &ebpf.FlowFetcherConfig{
EnableIngress: ingress, EnableIngress: ingress,
EnableEgress: egress, EnableEgress: egress,
Debug: debug, Debug: debug,
Sampling: cfg.Sampling, Sampling: cfg.Sampling,
CacheMaxSize: cfg.CacheMaxFlows, CacheMaxSize: cfg.CacheMaxFlows,
PktDrops: cfg.EnablePktDrops, PktDrops: cfg.EnablePktDrops,
DNSTracker: cfg.EnableDNSTracking, DNSTracker: cfg.EnableDNSTracking,
EnableRTT: cfg.EnableRTT, EnableRTT: cfg.EnableRTT,
EnableFlowFilter: cfg.EnableFlowFilter,
FlowFilterConfig: &ebpf.FlowFilterConfig{
FlowFilterAction: cfg.FlowFilterAction,
FlowFilterDirection: cfg.FlowFilterDirection,
FlowFilterIPCIDR: cfg.FlowFilterIPCIDR,
FlowFilterProtocol: cfg.FlowFilterProtocol,
FlowFilterPeerIP: cfg.FlowFilterPeerIP,
FlowFilterDestinationPort: ebpf.ConvertFilterPortsToInstr(cfg.FlowFilterDestinationPort, cfg.FlowFilterDestinationPortRange),
FlowFilterSourcePort: ebpf.ConvertFilterPortsToInstr(cfg.FlowFilterSourcePort, cfg.FlowFilterSourcePortRange),
FlowFilterPort: ebpf.ConvertFilterPortsToInstr(cfg.FlowFilterPort, cfg.FlowFilterPortRange),
},
} }
fetcher, err := ebpf.NewFlowFetcher(ebpfConfig) fetcher, err := ebpf.NewFlowFetcher(ebpfConfig)
......
...@@ -182,6 +182,43 @@ type Config struct { ...@@ -182,6 +182,43 @@ type Config struct {
// MetricsPrefix is the prefix of the metrics that are sent to the server. // MetricsPrefix is the prefix of the metrics that are sent to the server.
MetricsPrefix string `env:"METRICS_PREFIX" envDefault:"ebpf_agent_"` MetricsPrefix string `env:"METRICS_PREFIX" envDefault:"ebpf_agent_"`
// EnableFlowFilter enables flow filter, default is false.
EnableFlowFilter bool `env:"ENABLE_FLOW_FILTER" envDefault:"false"`
// FlowFilterDirection is the direction of the flow filter.
// Possible values are "Ingress" or "Egress".
FlowFilterDirection string `env:"FLOW_FILTER_DIRECTION"`
// FlowFilterIPCIDR is the IP CIDR to filter flows.
// Example: 10.10.10.0/24 or 100:100:100:100::/64
FlowFilterIPCIDR string `env:"FLOW_FILTER_IP_CIDR"`
// FlowFilterProtocol is the protocol to filter flows.
// Example: TCP, UDP, SCTP, ICMP, ICMPv6
FlowFilterProtocol string `env:"FLOW_FILTER_PROTOCOL"`
// FlowFilterSourcePort is the source port to filter flows.
FlowFilterSourcePort int32 `env:"FLOW_FILTER_SOURCE_PORT"`
// FlowFilterDestinationPort is the destination port to filter flows.
FlowFilterDestinationPort int32 `env:"FLOW_FILTER_DESTINATION_PORT"`
// FlowFilterPort is the port to filter flows, can be use for either source or destination port.
FlowFilterPort int32 `env:"FLOW_FILTER_PORT"`
// FlowFilterSourcePortRange is the source port range to filter flows.
// Example: 8000-8010
FlowFilterSourcePortRange string `env:"FLOW_FILTER_SOURCE_PORT_RANGE"`
// FlowFilterDestinationPortRange is the destination port range to filter flows.
// Example: 8000-8010
FlowFilterDestinationPortRange string `env:"FLOW_FILTER_DESTINATION_PORT_RANGE"`
// FlowFilterPortRange is the port range to filter flows, can be used for either source or destination port.
// Example: 8000-8010
FlowFilterPortRange string `env:"FLOW_FILTER_PORT_RANGE"`
// FlowFilterICMPType is the ICMP type to filter flows.
FlowFilterICMPType int `env:"FLOW_FILTER_ICMP_TYPE"`
// FlowFilterICMPCode is the ICMP code to filter flows.
FlowFilterICMPCode int `env:"FLOW_FILTER_ICMP_CODE"`
// FlowFilterPeerIP is the IP to filter flows.
// Example: 10.10.10.10
FlowFilterPeerIP string `env:"FLOW_FILTER_PEER_IP"`
// FlowFilterAction is the action to filter flows.
// Possible values are "Accept" or "Reject".
FlowFilterAction string `env:"FLOW_FILTER_ACTION"`
/* Deprecated configs are listed below this line /* Deprecated configs are listed below this line
* See manageDeprecatedConfigs function for details * See manageDeprecatedConfigs function for details
*/ */
......
...@@ -12,6 +12,14 @@ import ( ...@@ -12,6 +12,14 @@ import (
"github.com/cilium/ebpf" "github.com/cilium/ebpf"
) )
type BpfDirectionT uint32
const (
BpfDirectionTINGRESS BpfDirectionT = 0
BpfDirectionTEGRESS BpfDirectionT = 1
BpfDirectionTMAX_DIRECTION BpfDirectionT = 2
)
type BpfDnsFlowId struct { type BpfDnsFlowId struct {
SrcPort uint16 SrcPort uint16
DstPort uint16 DstPort uint16
...@@ -28,6 +36,34 @@ type BpfDnsRecordT struct { ...@@ -28,6 +36,34 @@ type BpfDnsRecordT struct {
Errno uint8 Errno uint8
} }
type BpfFilterActionT uint32
const (
BpfFilterActionTACCEPT BpfFilterActionT = 0
BpfFilterActionTREJECT BpfFilterActionT = 1
BpfFilterActionTMAX_FILTER_ACTIONS BpfFilterActionT = 2
)
type BpfFilterKeyT struct {
PrefixLen uint32
IpData [16]uint8
}
type BpfFilterValueT struct {
Protocol uint8
DstPortStart uint16
DstPortEnd uint16
SrcPortStart uint16
SrcPortEnd uint16
PortStart uint16
PortEnd uint16
IcmpType uint8
IcmpCode uint8
Direction BpfDirectionT
Action BpfFilterActionT
Ip [16]uint8
}
type BpfFlowId BpfFlowIdT type BpfFlowId BpfFlowIdT
type BpfFlowIdT struct { type BpfFlowIdT struct {
...@@ -69,6 +105,10 @@ type BpfGlobalCountersKeyT uint32 ...@@ -69,6 +105,10 @@ type BpfGlobalCountersKeyT uint32
const ( const (
BpfGlobalCountersKeyTHASHMAP_FLOWS_DROPPED_KEY BpfGlobalCountersKeyT = 0 BpfGlobalCountersKeyTHASHMAP_FLOWS_DROPPED_KEY BpfGlobalCountersKeyT = 0
BpfGlobalCountersKeyTFILTER_FLOWS_REJECT_KEY BpfGlobalCountersKeyT = 1
BpfGlobalCountersKeyTFILTER_FLOWS_ACCEPT_KEY BpfGlobalCountersKeyT = 2
BpfGlobalCountersKeyTFILTER_FLOWS_NOMATCH_KEY BpfGlobalCountersKeyT = 3
BpfGlobalCountersKeyTMAX_DROPPED_FLOWS_KEY BpfGlobalCountersKeyT = 4
) )
type BpfPktDropsT struct { type BpfPktDropsT struct {
...@@ -136,6 +176,7 @@ type BpfMapSpecs struct { ...@@ -136,6 +176,7 @@ type BpfMapSpecs struct {
AggregatedFlows *ebpf.MapSpec `ebpf:"aggregated_flows"` AggregatedFlows *ebpf.MapSpec `ebpf:"aggregated_flows"`
DirectFlows *ebpf.MapSpec `ebpf:"direct_flows"` DirectFlows *ebpf.MapSpec `ebpf:"direct_flows"`
DnsFlows *ebpf.MapSpec `ebpf:"dns_flows"` DnsFlows *ebpf.MapSpec `ebpf:"dns_flows"`
FilterMap *ebpf.MapSpec `ebpf:"filter_map"`
GlobalCounters *ebpf.MapSpec `ebpf:"global_counters"` GlobalCounters *ebpf.MapSpec `ebpf:"global_counters"`
PacketRecord *ebpf.MapSpec `ebpf:"packet_record"` PacketRecord *ebpf.MapSpec `ebpf:"packet_record"`
} }
...@@ -162,6 +203,7 @@ type BpfMaps struct { ...@@ -162,6 +203,7 @@ type BpfMaps struct {
AggregatedFlows *ebpf.Map `ebpf:"aggregated_flows"` AggregatedFlows *ebpf.Map `ebpf:"aggregated_flows"`
DirectFlows *ebpf.Map `ebpf:"direct_flows"` DirectFlows *ebpf.Map `ebpf:"direct_flows"`
DnsFlows *ebpf.Map `ebpf:"dns_flows"` DnsFlows *ebpf.Map `ebpf:"dns_flows"`
FilterMap *ebpf.Map `ebpf:"filter_map"`
GlobalCounters *ebpf.Map `ebpf:"global_counters"` GlobalCounters *ebpf.Map `ebpf:"global_counters"`
PacketRecord *ebpf.Map `ebpf:"packet_record"` PacketRecord *ebpf.Map `ebpf:"packet_record"`
} }
...@@ -171,6 +213,7 @@ func (m *BpfMaps) Close() error { ...@@ -171,6 +213,7 @@ func (m *BpfMaps) Close() error {
m.AggregatedFlows, m.AggregatedFlows,
m.DirectFlows, m.DirectFlows,
m.DnsFlows, m.DnsFlows,
m.FilterMap,
m.GlobalCounters, m.GlobalCounters,
m.PacketRecord, m.PacketRecord,
) )
......
No preview for this file type
...@@ -12,6 +12,14 @@ import ( ...@@ -12,6 +12,14 @@ import (
"github.com/cilium/ebpf" "github.com/cilium/ebpf"
) )
type BpfDirectionT uint32
const (
BpfDirectionTINGRESS BpfDirectionT = 0
BpfDirectionTEGRESS BpfDirectionT = 1
BpfDirectionTMAX_DIRECTION BpfDirectionT = 2
)
type BpfDnsFlowId struct { type BpfDnsFlowId struct {
SrcPort uint16 SrcPort uint16
DstPort uint16 DstPort uint16
...@@ -28,6 +36,34 @@ type BpfDnsRecordT struct { ...@@ -28,6 +36,34 @@ type BpfDnsRecordT struct {
Errno uint8 Errno uint8
} }
type BpfFilterActionT uint32
const (
BpfFilterActionTACCEPT BpfFilterActionT = 0
BpfFilterActionTREJECT BpfFilterActionT = 1
BpfFilterActionTMAX_FILTER_ACTIONS BpfFilterActionT = 2
)
type BpfFilterKeyT struct {
PrefixLen uint32
IpData [16]uint8
}
type BpfFilterValueT struct {
Protocol uint8
DstPortStart uint16
DstPortEnd uint16
SrcPortStart uint16
SrcPortEnd uint16
PortStart uint16
PortEnd uint16
IcmpType uint8
IcmpCode uint8
Direction BpfDirectionT
Action BpfFilterActionT
Ip [16]uint8
}
type BpfFlowId BpfFlowIdT type BpfFlowId BpfFlowIdT
type BpfFlowIdT struct { type BpfFlowIdT struct {
...@@ -69,6 +105,10 @@ type BpfGlobalCountersKeyT uint32 ...@@ -69,6 +105,10 @@ type BpfGlobalCountersKeyT uint32
const ( const (
BpfGlobalCountersKeyTHASHMAP_FLOWS_DROPPED_KEY BpfGlobalCountersKeyT = 0 BpfGlobalCountersKeyTHASHMAP_FLOWS_DROPPED_KEY BpfGlobalCountersKeyT = 0
BpfGlobalCountersKeyTFILTER_FLOWS_REJECT_KEY BpfGlobalCountersKeyT = 1
BpfGlobalCountersKeyTFILTER_FLOWS_ACCEPT_KEY BpfGlobalCountersKeyT = 2
BpfGlobalCountersKeyTFILTER_FLOWS_NOMATCH_KEY BpfGlobalCountersKeyT = 3
BpfGlobalCountersKeyTMAX_DROPPED_FLOWS_KEY BpfGlobalCountersKeyT = 4
) )
type BpfPktDropsT struct { type BpfPktDropsT struct {
...@@ -136,6 +176,7 @@ type BpfMapSpecs struct { ...@@ -136,6 +176,7 @@ type BpfMapSpecs struct {
AggregatedFlows *ebpf.MapSpec `ebpf:"aggregated_flows"` AggregatedFlows *ebpf.MapSpec `ebpf:"aggregated_flows"`
DirectFlows *ebpf.MapSpec `ebpf:"direct_flows"` DirectFlows *ebpf.MapSpec `ebpf:"direct_flows"`
DnsFlows *ebpf.MapSpec `ebpf:"dns_flows"` DnsFlows *ebpf.MapSpec `ebpf:"dns_flows"`
FilterMap *ebpf.MapSpec `ebpf:"filter_map"`
GlobalCounters *ebpf.MapSpec `ebpf:"global_counters"` GlobalCounters *ebpf.MapSpec `ebpf:"global_counters"`
PacketRecord *ebpf.MapSpec `ebpf:"packet_record"` PacketRecord *ebpf.MapSpec `ebpf:"packet_record"`
} }
...@@ -162,6 +203,7 @@ type BpfMaps struct { ...@@ -162,6 +203,7 @@ type BpfMaps struct {
AggregatedFlows *ebpf.Map `ebpf:"aggregated_flows"` AggregatedFlows *ebpf.Map `ebpf:"aggregated_flows"`
DirectFlows *ebpf.Map `ebpf:"direct_flows"` DirectFlows *ebpf.Map `ebpf:"direct_flows"`
DnsFlows *ebpf.Map `ebpf:"dns_flows"` DnsFlows *ebpf.Map `ebpf:"dns_flows"`
FilterMap *ebpf.Map `ebpf:"filter_map"`
GlobalCounters *ebpf.Map `ebpf:"global_counters"` GlobalCounters *ebpf.Map `ebpf:"global_counters"`
PacketRecord *ebpf.Map `ebpf:"packet_record"` PacketRecord *ebpf.Map `ebpf:"packet_record"`
} }
...@@ -171,6 +213,7 @@ func (m *BpfMaps) Close() error { ...@@ -171,6 +213,7 @@ func (m *BpfMaps) Close() error {
m.AggregatedFlows, m.AggregatedFlows,
m.DirectFlows, m.DirectFlows,
m.DnsFlows, m.DnsFlows,
m.FilterMap,
m.GlobalCounters, m.GlobalCounters,
m.PacketRecord, m.PacketRecord,
) )
......
No preview for this file type
...@@ -12,6 +12,14 @@ import ( ...@@ -12,6 +12,14 @@ import (
"github.com/cilium/ebpf" "github.com/cilium/ebpf"
) )
type BpfDirectionT uint32
const (
BpfDirectionTINGRESS BpfDirectionT = 0
BpfDirectionTEGRESS BpfDirectionT = 1
BpfDirectionTMAX_DIRECTION BpfDirectionT = 2
)
type BpfDnsFlowId struct { type BpfDnsFlowId struct {
SrcPort uint16 SrcPort uint16
DstPort uint16 DstPort uint16
...@@ -28,6 +36,34 @@ type BpfDnsRecordT struct { ...@@ -28,6 +36,34 @@ type BpfDnsRecordT struct {
Errno uint8 Errno uint8
} }
type BpfFilterActionT uint32
const (
BpfFilterActionTACCEPT BpfFilterActionT = 0
BpfFilterActionTREJECT BpfFilterActionT = 1
BpfFilterActionTMAX_FILTER_ACTIONS BpfFilterActionT = 2
)
type BpfFilterKeyT struct {
PrefixLen uint32
IpData [16]uint8
}
type BpfFilterValueT struct {
Protocol uint8
DstPortStart uint16
DstPortEnd uint16
SrcPortStart uint16
SrcPortEnd uint16
PortStart uint16
PortEnd uint16
IcmpType uint8
IcmpCode uint8
Direction BpfDirectionT
Action BpfFilterActionT
Ip [16]uint8
}
type BpfFlowId BpfFlowIdT type BpfFlowId BpfFlowIdT
type BpfFlowIdT struct { type BpfFlowIdT struct {
...@@ -69,6 +105,10 @@ type BpfGlobalCountersKeyT uint32 ...@@ -69,6 +105,10 @@ type BpfGlobalCountersKeyT uint32
const ( const (
BpfGlobalCountersKeyTHASHMAP_FLOWS_DROPPED_KEY BpfGlobalCountersKeyT = 0 BpfGlobalCountersKeyTHASHMAP_FLOWS_DROPPED_KEY BpfGlobalCountersKeyT = 0
BpfGlobalCountersKeyTFILTER_FLOWS_REJECT_KEY BpfGlobalCountersKeyT = 1
BpfGlobalCountersKeyTFILTER_FLOWS_ACCEPT_KEY BpfGlobalCountersKeyT = 2
BpfGlobalCountersKeyTFILTER_FLOWS_NOMATCH_KEY BpfGlobalCountersKeyT = 3
BpfGlobalCountersKeyTMAX_DROPPED_FLOWS_KEY BpfGlobalCountersKeyT = 4
) )
type BpfPktDropsT struct { type BpfPktDropsT struct {
...@@ -136,6 +176,7 @@ type BpfMapSpecs struct { ...@@ -136,6 +176,7 @@ type BpfMapSpecs struct {
AggregatedFlows *ebpf.MapSpec `ebpf:"aggregated_flows"` AggregatedFlows *ebpf.MapSpec `ebpf:"aggregated_flows"`
DirectFlows *ebpf.MapSpec `ebpf:"direct_flows"` DirectFlows *ebpf.MapSpec `ebpf:"direct_flows"`
DnsFlows *ebpf.MapSpec `ebpf:"dns_flows"` DnsFlows *ebpf.MapSpec `ebpf:"dns_flows"`
FilterMap *ebpf.MapSpec `ebpf:"filter_map"`
GlobalCounters *ebpf.MapSpec `ebpf:"global_counters"` GlobalCounters *ebpf.MapSpec `ebpf:"global_counters"`
PacketRecord *ebpf.MapSpec `ebpf:"packet_record"` PacketRecord *ebpf.MapSpec `ebpf:"packet_record"`
} }
...@@ -162,6 +203,7 @@ type BpfMaps struct { ...@@ -162,6 +203,7 @@ type BpfMaps struct {
AggregatedFlows *ebpf.Map `ebpf:"aggregated_flows"` AggregatedFlows *ebpf.Map `ebpf:"aggregated_flows"`
DirectFlows *ebpf.Map `ebpf:"direct_flows"` DirectFlows *ebpf.Map `ebpf:"direct_flows"`
DnsFlows *ebpf.Map `ebpf:"dns_flows"` DnsFlows *ebpf.Map `ebpf:"dns_flows"`
FilterMap *ebpf.Map `ebpf:"filter_map"`
GlobalCounters *ebpf.Map `ebpf:"global_counters"` GlobalCounters *ebpf.Map `ebpf:"global_counters"`
PacketRecord *ebpf.Map `ebpf:"packet_record"` PacketRecord *ebpf.Map `ebpf:"packet_record"`
} }
...@@ -171,6 +213,7 @@ func (m *BpfMaps) Close() error { ...@@ -171,6 +213,7 @@ func (m *BpfMaps) Close() error {
m.AggregatedFlows, m.AggregatedFlows,
m.DirectFlows, m.DirectFlows,
m.DnsFlows, m.DnsFlows,
m.FilterMap,
m.GlobalCounters, m.GlobalCounters,
m.PacketRecord, m.PacketRecord,
) )
......
No preview for this file type
...@@ -12,6 +12,14 @@ import ( ...@@ -12,6 +12,14 @@ import (
"github.com/cilium/ebpf" "github.com/cilium/ebpf"
) )
type BpfDirectionT uint32
const (
BpfDirectionTINGRESS BpfDirectionT = 0
BpfDirectionTEGRESS BpfDirectionT = 1
BpfDirectionTMAX_DIRECTION BpfDirectionT = 2
)
type BpfDnsFlowId struct { type BpfDnsFlowId struct {
SrcPort uint16 SrcPort uint16
DstPort uint16 DstPort uint16
...@@ -28,6 +36,34 @@ type BpfDnsRecordT struct { ...@@ -28,6 +36,34 @@ type BpfDnsRecordT struct {
Errno uint8 Errno uint8
} }
type BpfFilterActionT uint32
const (
BpfFilterActionTACCEPT BpfFilterActionT = 0
BpfFilterActionTREJECT BpfFilterActionT = 1
BpfFilterActionTMAX_FILTER_ACTIONS BpfFilterActionT = 2
)
type BpfFilterKeyT struct {
PrefixLen uint32
IpData [16]uint8
}
type BpfFilterValueT struct {
Protocol uint8
DstPortStart uint16
DstPortEnd uint16
SrcPortStart uint16
SrcPortEnd uint16
PortStart uint16
PortEnd uint16
IcmpType uint8
IcmpCode uint8
Direction BpfDirectionT
Action BpfFilterActionT
Ip [16]uint8
}
type BpfFlowId BpfFlowIdT type BpfFlowId BpfFlowIdT
type BpfFlowIdT struct { type BpfFlowIdT struct {
...@@ -69,6 +105,10 @@ type BpfGlobalCountersKeyT uint32 ...@@ -69,6 +105,10 @@ type BpfGlobalCountersKeyT uint32
const ( const (
BpfGlobalCountersKeyTHASHMAP_FLOWS_DROPPED_KEY BpfGlobalCountersKeyT = 0 BpfGlobalCountersKeyTHASHMAP_FLOWS_DROPPED_KEY BpfGlobalCountersKeyT = 0
BpfGlobalCountersKeyTFILTER_FLOWS_REJECT_KEY BpfGlobalCountersKeyT = 1
BpfGlobalCountersKeyTFILTER_FLOWS_ACCEPT_KEY BpfGlobalCountersKeyT = 2
BpfGlobalCountersKeyTFILTER_FLOWS_NOMATCH_KEY BpfGlobalCountersKeyT = 3
BpfGlobalCountersKeyTMAX_DROPPED_FLOWS_KEY BpfGlobalCountersKeyT = 4
) )
type BpfPktDropsT struct { type BpfPktDropsT struct {
...@@ -136,6 +176,7 @@ type BpfMapSpecs struct { ...@@ -136,6 +176,7 @@ type BpfMapSpecs struct {
AggregatedFlows *ebpf.MapSpec `ebpf:"aggregated_flows"` AggregatedFlows *ebpf.MapSpec `ebpf:"aggregated_flows"`
DirectFlows *ebpf.MapSpec `ebpf:"direct_flows"` DirectFlows *ebpf.MapSpec `ebpf:"direct_flows"`
DnsFlows *ebpf.MapSpec `ebpf:"dns_flows"` DnsFlows *ebpf.MapSpec `ebpf:"dns_flows"`
FilterMap *ebpf.MapSpec `ebpf:"filter_map"`
GlobalCounters *ebpf.MapSpec `ebpf:"global_counters"` GlobalCounters *ebpf.MapSpec `ebpf:"global_counters"`
PacketRecord *ebpf.MapSpec `ebpf:"packet_record"` PacketRecord *ebpf.MapSpec `ebpf:"packet_record"`
} }
...@@ -162,6 +203,7 @@ type BpfMaps struct { ...@@ -162,6 +203,7 @@ type BpfMaps struct {
AggregatedFlows *ebpf.Map `ebpf:"aggregated_flows"` AggregatedFlows *ebpf.Map `ebpf:"aggregated_flows"`
DirectFlows *ebpf.Map `ebpf:"direct_flows"` DirectFlows *ebpf.Map `ebpf:"direct_flows"`
DnsFlows *ebpf.Map `ebpf:"dns_flows"` DnsFlows *ebpf.Map `ebpf:"dns_flows"`
FilterMap *ebpf.Map `ebpf:"filter_map"`
GlobalCounters *ebpf.Map `ebpf:"global_counters"` GlobalCounters *ebpf.Map `ebpf:"global_counters"`
PacketRecord *ebpf.Map `ebpf:"packet_record"` PacketRecord *ebpf.Map `ebpf:"packet_record"`
} }
...@@ -171,6 +213,7 @@ func (m *BpfMaps) Close() error { ...@@ -171,6 +213,7 @@ func (m *BpfMaps) Close() error {
m.AggregatedFlows, m.AggregatedFlows,
m.DirectFlows, m.DirectFlows,
m.DnsFlows, m.DnsFlows,
m.FilterMap,
m.GlobalCounters, m.GlobalCounters,
m.PacketRecord, m.PacketRecord,
) )
......
No preview for this file type
package ebpf
import (
"fmt"
"net"
"strconv"
"strings"
"syscall"
"github.com/cilium/ebpf"
"k8s.io/apimachinery/pkg/util/intstr"
)
type FlowFilterConfig struct {
FlowFilterDirection string
FlowFilterIPCIDR string
FlowFilterProtocol string
FlowFilterSourcePort intstr.IntOrString
FlowFilterDestinationPort intstr.IntOrString
FlowFilterPort intstr.IntOrString
FlowFilterIcmpType int
FlowFilterIcmpCode int
FlowFilterPeerIP string
FlowFilterAction string
}
type FlowFilter struct {
// eBPF objs to create/update eBPF maps
objects *BpfObjects
config *FlowFilterConfig
}
func NewFlowFilter(objects *BpfObjects, cfg *FlowFilterConfig) *FlowFilter {
return &FlowFilter{
objects: objects,
config: cfg,
}
}
func (f *FlowFilter) ProgramFlowFilter() error {
log.Infof("Flow filter config: %v", f.config)
key, err := f.getFlowFilterKey(f.config)
if err != nil {
return fmt.Errorf("failed to get flow filter key: %w", err)
}
val, err := f.getFlowFilterValue(f.config)
if err != nil {
return fmt.Errorf("failed to get flow filter value: %w", err)
}
err = f.objects.FilterMap.Update(key, val, ebpf.UpdateAny)
if err != nil {
return fmt.Errorf("failed to update flow filter map: %w", err)
}
log.Infof("Programmed flow filter with key: %v, value: %v", key, val)
return nil
}
func (f *FlowFilter) getFlowFilterKey(config *FlowFilterConfig) (BpfFilterKeyT, error) {
key := BpfFilterKeyT{}
ip, ipNet, err := net.ParseCIDR(config.FlowFilterIPCIDR)
if err != nil {
return key, fmt.Errorf("failed to parse FlowFilterIPCIDR: %w", err)
}
if ip.To4() != nil {
copy(key.IpData[:], ip.To4())
} else {
copy(key.IpData[:], ip.To16())
}
pfLen, _ := ipNet.Mask.Size()
key.PrefixLen = uint32(pfLen)
return key, nil
}
func (f *FlowFilter) getFlowFilterValue(config *FlowFilterConfig) (BpfFilterValueT, error) {
val := BpfFilterValueT{}
switch config.FlowFilterDirection {
case "Ingress":
val.Direction = BpfDirectionTINGRESS
case "Egress":
val.Direction = BpfDirectionTEGRESS
default:
val.Direction = BpfDirectionTMAX_DIRECTION
}
switch config.FlowFilterAction {
case "Reject":
val.Action = BpfFilterActionTREJECT
case "Accept":
val.Action = BpfFilterActionTACCEPT
default:
val.Action = BpfFilterActionTMAX_FILTER_ACTIONS
}
switch config.FlowFilterProtocol {
case "TCP":
val.Protocol = syscall.IPPROTO_TCP
val.DstPortStart, val.DstPortEnd = getDstPorts(config)
val.SrcPortStart, val.SrcPortEnd = getSrcPorts(config)
val.PortStart, val.PortEnd = getPorts(config)
case "UDP":
val.Protocol = syscall.IPPROTO_UDP
val.DstPortStart, val.DstPortEnd = getDstPorts(config)
val.SrcPortStart, val.SrcPortEnd = getSrcPorts(config)
val.PortStart, val.PortEnd = getPorts(config)
case "SCTP":
val.Protocol = syscall.IPPROTO_SCTP
val.DstPortStart, val.DstPortEnd = getDstPorts(config)
val.SrcPortStart, val.SrcPortEnd = getSrcPorts(config)
val.PortStart, val.PortEnd = getPorts(config)
case "ICMP":
val.Protocol = syscall.IPPROTO_ICMP
val.IcmpType = uint8(config.FlowFilterIcmpType)
val.IcmpCode = uint8(config.FlowFilterIcmpCode)
case "ICMPv6":
val.Protocol = syscall.IPPROTO_ICMPV6
val.IcmpType = uint8(config.FlowFilterIcmpType)
val.IcmpCode = uint8(config.FlowFilterIcmpCode)
}
if config.FlowFilterPeerIP != "" {
ip := net.ParseIP(config.FlowFilterPeerIP)
if ip.To4() != nil {
copy(val.Ip[:], ip.To4())
} else {
copy(val.Ip[:], ip.To16())
}
}
return val, nil
}
func getSrcPorts(config *FlowFilterConfig) (uint16, uint16) {
if config.FlowFilterSourcePort.Type == intstr.Int {
return uint16(config.FlowFilterSourcePort.IntVal), 0
}
start, end, err := getPortsFromString(config.FlowFilterSourcePort.String())
if err != nil {
return 0, 0
}
return start, end
}
func getDstPorts(config *FlowFilterConfig) (uint16, uint16) {
if config.FlowFilterDestinationPort.Type == intstr.Int {
return uint16(config.FlowFilterDestinationPort.IntVal), 0
}
start, end, err := getPortsFromString(config.FlowFilterDestinationPort.String())
if err != nil {
return 0, 0
}
return start, end
}
func getPorts(config *FlowFilterConfig) (uint16, uint16) {
if config.FlowFilterDestinationPort.Type == intstr.Int {
return uint16(config.FlowFilterPort.IntVal), 0
}
start, end, err := getPortsFromString(config.FlowFilterPort.String())
if err != nil {
return 0, 0
}
return start, end
}
func getPortsFromString(s string) (uint16, uint16, error) {
ps := strings.SplitN(s, "-", 2)
if len(ps) != 2 {
return 0, 0, fmt.Errorf("invalid ports range. Expected two integers separated by hyphen but found %s", s)
}
startPort, err := strconv.ParseUint(ps[0], 10, 16)
if err != nil {
return 0, 0, fmt.Errorf("invalid start port number %w", err)
}
endPort, err := strconv.ParseUint(ps[1], 10, 16)
if err != nil {
return 0, 0, fmt.Errorf("invalid end port number %w", err)
}
if startPort > endPort {
return 0, 0, fmt.Errorf("invalid port range. Start port is greater than end port")
}
if startPort == endPort {
return 0, 0, fmt.Errorf("invalid port range. Start and end port are equal. Remove the hyphen and enter a single port")
}
if startPort == 0 {
return 0, 0, fmt.Errorf("invalid start port 0")
}
return uint16(startPort), uint16(endPort), nil
}
func ConvertFilterPortsToInstr(intPort int32, rangePorts string) intstr.IntOrString {
if rangePorts == "" {
return intstr.FromInt32(intPort)
}
return intstr.FromString(rangePorts)
}
package ebpf
import (
"fmt"
"net"
"syscall"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/intstr"
)
func TestGetPortsFromString(t *testing.T) {
testCases := []struct {
input string
expectedStart uint16
expectedEnd uint16
expectedError error
}{
{
input: "80-90",
expectedStart: 80,
expectedEnd: 90,
expectedError: nil,
},
{
input: "90-80",
expectedStart: 0,
expectedEnd: 0,
expectedError: fmt.Errorf("invalid port range. Start port is greater than end port"),
},
{
input: "80",
expectedStart: 0,
expectedEnd: 0,
expectedError: fmt.Errorf("invalid ports range. Expected two integers separated by hyphen but found 80"),
},
{
input: "80000-8080",
expectedStart: 0,
expectedEnd: 0,
expectedError: fmt.Errorf("invalid start port number strconv.ParseUint: parsing \"80000\": value out of range"),
},
}
for _, tc := range testCases {
start, end, err := getPortsFromString(tc.input)
if tc.expectedError != nil {
require.Error(t, err)
require.Equal(t, tc.expectedError.Error(), err.Error())
} else {
require.NoError(t, err)
require.Equal(t, tc.expectedStart, start)
require.Equal(t, tc.expectedEnd, end)
}
}
}
func TestFlowFilter_getFlowFilterKey(t *testing.T) {
f := FlowFilter{}
config := &FlowFilterConfig{
FlowFilterIPCIDR: "192.168.1.0/24",
}
expectedIP := net.ParseIP("192.168.1.0").To4()
expectedPrefixLen := uint32(24)
key, err := f.getFlowFilterKey(config)
assert.Nil(t, err)
assert.Equal(t, []uint8(expectedIP), key.IpData[:4])
assert.Equal(t, expectedPrefixLen, key.PrefixLen)
}
func TestFlowFilter_getFlowFilterValue(t *testing.T) {
f := FlowFilter{}
config := &FlowFilterConfig{
FlowFilterDirection: "Ingress",
FlowFilterProtocol: "TCP",
FlowFilterSourcePort: intstr.FromInt32(8080),
FlowFilterDestinationPort: intstr.FromString("8000-9000"),
}
value, err := f.getFlowFilterValue(config)
assert.Nil(t, err)
assert.Equal(t, BpfDirectionTINGRESS, value.Direction)
assert.Equal(t, uint8(syscall.IPPROTO_TCP), value.Protocol)
assert.Equal(t, uint16(8080), value.SrcPortStart)
assert.Equal(t, uint16(0), value.SrcPortEnd)
assert.Equal(t, uint16(8000), value.DstPortStart)
assert.Equal(t, uint16(9000), value.DstPortEnd)
}
func TestGetSrcPorts(t *testing.T) {
config := &FlowFilterConfig{
FlowFilterSourcePort: intstr.FromString("8000-9000"),
}
start, end := getSrcPorts(config)
assert.Equal(t, uint16(8000), start)
assert.Equal(t, uint16(9000), end)
}
func TestGetDstPorts(t *testing.T) {
config := &FlowFilterConfig{
FlowFilterDestinationPort: intstr.FromInt32(8080),
}
start, end := getDstPorts(config)
assert.Equal(t, uint16(8080), start)
assert.Equal(t, uint16(0), end)
}
func TestConvertFilterPortsToInstr(t *testing.T) {
t.Run("converts int port", func(t *testing.T) {
port := int32(80)
result := ConvertFilterPortsToInstr(port, "")
require.Equal(t, intstr.FromInt32(port), result)
})
t.Run("converts string range", func(t *testing.T) {
rangeStr := "80-90"
result := ConvertFilterPortsToInstr(0, rangeStr)
require.Equal(t, intstr.FromString(rangeStr), result)
})
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment