Newer
Older
shach33
committed
func (p *PacketFetcher) registerEgress(iface ifaces.Interface, ipvlan netlink.Link) error {
egressFilter, err := fetchEgressEvents(iface, ipvlan, p.objects.TcEgressPcaParse, "tc_egress_pca_parse")
shach33
committed
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
if err != nil {
return err
}
p.egressFilters[iface] = egressFilter
return nil
}
func fetchIngressEvents(iface ifaces.Interface, ipvlan netlink.Link, parser *ebpf.Program, name string) (*netlink.BpfFilter, error) {
ilog := plog.WithField("iface", iface)
ingressAttrs := netlink.FilterAttrs{
LinkIndex: ipvlan.Attrs().Index,
Parent: netlink.HANDLE_MIN_INGRESS,
Handle: netlink.MakeHandle(0, 1),
Protocol: 3,
Priority: 1,
}
ingressFilter := &netlink.BpfFilter{
FilterAttrs: ingressAttrs,
Fd: parser.FD(),
Name: "tc/" + name,
DirectAction: true,
}
if err := netlink.FilterDel(ingressFilter); err == nil {
ilog.Warn("egress filter already existed. Deleted it")
}
if err := netlink.FilterAdd(ingressFilter); err != nil {
if errors.Is(err, fs.ErrExist) {
ilog.WithError(err).Warn("ingress filter already exists. Ignoring")
} else {
return nil, fmt.Errorf("failed to create egress filter: %w", err)
}
}
return ingressFilter, nil
}
func (p *PacketFetcher) registerIngress(iface ifaces.Interface, ipvlan netlink.Link) error {
ingressFilter, err := fetchIngressEvents(iface, ipvlan, p.objects.TcIngressPcaParse, "tc_ingress_pca_parse")
shach33
committed
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
if err != nil {
return err
}
p.ingressFilters[iface] = ingressFilter
return nil
}
// Close the eBPF fetcher from the system.
// We don't need an "Close(iface)" method because the filters and qdiscs
// are automatically removed when the interface is down
func (p *PacketFetcher) Close() error {
log.Debug("unregistering eBPF objects")
var errs []error
if p.perfReader != nil {
if err := p.perfReader.Close(); err != nil {
errs = append(errs, err)
}
}
if p.objects != nil {
Mohamed S. Mahmoud
committed
if err := p.objects.TcEgressPcaParse.Close(); err != nil {
errs = append(errs, err)
}
if err := p.objects.TcIngressPcaParse.Close(); err != nil {
shach33
committed
errs = append(errs, err)
}
Mohamed S. Mahmoud
committed
if err := p.objects.TcxEgressPcaParse.Close(); err != nil {
errs = append(errs, err)
}
if err := p.objects.TcxIngressPcaParse.Close(); err != nil {
shach33
committed
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
errs = append(errs, err)
}
if err := p.objects.PacketRecord.Close(); err != nil {
errs = append(errs, err)
}
p.objects = nil
}
for iface, ef := range p.egressFilters {
log.WithField("interface", iface).Debug("deleting egress filter")
if err := netlink.FilterDel(ef); err != nil {
errs = append(errs, fmt.Errorf("deleting egress filter: %w", err))
}
}
p.egressFilters = map[ifaces.Interface]*netlink.BpfFilter{}
for iface, igf := range p.ingressFilters {
log.WithField("interface", iface).Debug("deleting ingress filter")
if err := netlink.FilterDel(igf); err != nil {
errs = append(errs, fmt.Errorf("deleting ingress filter: %w", err))
}
}
p.ingressFilters = map[ifaces.Interface]*netlink.BpfFilter{}
for iface, qd := range p.qdiscs {
log.WithField("interface", iface).Debug("deleting Qdisc")
if err := netlink.QdiscDel(qd); err != nil {
errs = append(errs, fmt.Errorf("deleting qdisc: %w", err))
}
}
p.qdiscs = map[ifaces.Interface]*netlink.GenericQdisc{}
if len(errs) == 0 {
return nil
}
Mohamed S. Mahmoud
committed
for iface, l := range p.egressTCXLink {
log := log.WithField("interface", iface)
log.Debug("detach egress TCX hook")
l.Close()
}
p.egressTCXLink = map[ifaces.Interface]link.Link{}
for iface, l := range p.ingressTCXLink {
log := log.WithField("interface", iface)
log.Debug("detach ingress TCX hook")
l.Close()
}
p.ingressTCXLink = map[ifaces.Interface]link.Link{}
shach33
committed
var errStrings []string
for _, err := range errs {
errStrings = append(errStrings, err.Error())
}
return errors.New(`errors: "` + strings.Join(errStrings, `", "`) + `"`)
}
func (p *PacketFetcher) ReadPerf() (perf.Record, error) {
return p.perfReader.Read()
}
func (p *PacketFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[int][]*byte {
if !p.lookupAndDeleteSupported {
return p.legacyLookupAndDeleteMap(met)
}
shach33
committed
packetMap := p.objects.PacketRecord
iterator := packetMap.Iterate()
packets := make(map[int][]*byte, p.cacheMaxSize)
var id int
shach33
committed
var packet []*byte
// First, get all ids and ignore content (we need lookup+delete to be atomic)
shach33
committed
for iterator.Next(&id, &packet) {
ids = append(ids, id)
}
// Run the atomic Lookup+Delete; if new ids have been inserted in the meantime, they'll be fetched next time
for i, id := range ids {
if err := packetMap.LookupAndDelete(&id, &packet); err != nil {
if i == 0 && errors.Is(err, ebpf.ErrNotSupported) {
log.WithError(err).Warnf("switching to legacy mode")
p.lookupAndDeleteSupported = false
return p.legacyLookupAndDeleteMap(met)
}
log.WithError(err).WithField("packetID", id).Warnf("couldn't delete entry")
met.Errors.WithErrorName("pkt-fetcher", "CannotDeleteEntry").Inc()
shach33
committed
}
packets[id] = packet
shach33
committed
}
shach33
committed
return packets
}