Skip to content
Snippets Groups Projects
tracer.go 37.9 KiB
Newer Older
  • Learn to ignore specific revisions
  • 
    func (p *PacketFetcher) registerEgress(iface ifaces.Interface, ipvlan netlink.Link) error {
    
    	egressFilter, err := fetchEgressEvents(iface, ipvlan, p.objects.TcEgressPcaParse, "tc_egress_pca_parse")
    
    	if err != nil {
    		return err
    	}
    
    	p.egressFilters[iface] = egressFilter
    	return nil
    }
    
    func fetchIngressEvents(iface ifaces.Interface, ipvlan netlink.Link, parser *ebpf.Program, name string) (*netlink.BpfFilter, error) {
    	ilog := plog.WithField("iface", iface)
    	ingressAttrs := netlink.FilterAttrs{
    		LinkIndex: ipvlan.Attrs().Index,
    		Parent:    netlink.HANDLE_MIN_INGRESS,
    		Handle:    netlink.MakeHandle(0, 1),
    		Protocol:  3,
    		Priority:  1,
    	}
    	ingressFilter := &netlink.BpfFilter{
    		FilterAttrs:  ingressAttrs,
    		Fd:           parser.FD(),
    		Name:         "tc/" + name,
    		DirectAction: true,
    	}
    	if err := netlink.FilterDel(ingressFilter); err == nil {
    		ilog.Warn("egress filter already existed. Deleted it")
    	}
    	if err := netlink.FilterAdd(ingressFilter); err != nil {
    		if errors.Is(err, fs.ErrExist) {
    			ilog.WithError(err).Warn("ingress filter already exists. Ignoring")
    		} else {
    			return nil, fmt.Errorf("failed to create egress filter: %w", err)
    		}
    	}
    	return ingressFilter, nil
    
    }
    
    func (p *PacketFetcher) registerIngress(iface ifaces.Interface, ipvlan netlink.Link) error {
    
    	ingressFilter, err := fetchIngressEvents(iface, ipvlan, p.objects.TcIngressPcaParse, "tc_ingress_pca_parse")
    
    	if err != nil {
    		return err
    	}
    
    	p.ingressFilters[iface] = ingressFilter
    	return nil
    }
    
    // Close the eBPF fetcher from the system.
    // We don't need an "Close(iface)" method because the filters and qdiscs
    // are automatically removed when the interface is down
    func (p *PacketFetcher) Close() error {
    	log.Debug("unregistering eBPF objects")
    
    	var errs []error
    	if p.perfReader != nil {
    		if err := p.perfReader.Close(); err != nil {
    			errs = append(errs, err)
    		}
    	}
    	if p.objects != nil {
    
    		if err := p.objects.TcEgressPcaParse.Close(); err != nil {
    			errs = append(errs, err)
    		}
    		if err := p.objects.TcIngressPcaParse.Close(); err != nil {
    
    		if err := p.objects.TcxEgressPcaParse.Close(); err != nil {
    			errs = append(errs, err)
    		}
    		if err := p.objects.TcxIngressPcaParse.Close(); err != nil {
    
    			errs = append(errs, err)
    		}
    		if err := p.objects.PacketRecord.Close(); err != nil {
    			errs = append(errs, err)
    		}
    		p.objects = nil
    	}
    	for iface, ef := range p.egressFilters {
    		log.WithField("interface", iface).Debug("deleting egress filter")
    		if err := netlink.FilterDel(ef); err != nil {
    			errs = append(errs, fmt.Errorf("deleting egress filter: %w", err))
    		}
    	}
    	p.egressFilters = map[ifaces.Interface]*netlink.BpfFilter{}
    	for iface, igf := range p.ingressFilters {
    		log.WithField("interface", iface).Debug("deleting ingress filter")
    		if err := netlink.FilterDel(igf); err != nil {
    			errs = append(errs, fmt.Errorf("deleting ingress filter: %w", err))
    		}
    	}
    	p.ingressFilters = map[ifaces.Interface]*netlink.BpfFilter{}
    	for iface, qd := range p.qdiscs {
    		log.WithField("interface", iface).Debug("deleting Qdisc")
    		if err := netlink.QdiscDel(qd); err != nil {
    			errs = append(errs, fmt.Errorf("deleting qdisc: %w", err))
    		}
    	}
    	p.qdiscs = map[ifaces.Interface]*netlink.GenericQdisc{}
    	if len(errs) == 0 {
    		return nil
    	}
    
    
    	for iface, l := range p.egressTCXLink {
    		log := log.WithField("interface", iface)
    		log.Debug("detach egress TCX hook")
    		l.Close()
    
    	}
    	p.egressTCXLink = map[ifaces.Interface]link.Link{}
    	for iface, l := range p.ingressTCXLink {
    		log := log.WithField("interface", iface)
    		log.Debug("detach ingress TCX hook")
    		l.Close()
    	}
    	p.ingressTCXLink = map[ifaces.Interface]link.Link{}
    
    
    	var errStrings []string
    	for _, err := range errs {
    		errStrings = append(errStrings, err.Error())
    	}
    	return errors.New(`errors: "` + strings.Join(errStrings, `", "`) + `"`)
    }
    
    func (p *PacketFetcher) ReadPerf() (perf.Record, error) {
    	return p.perfReader.Read()
    }
    
    
    func (p *PacketFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[int][]*byte {
    
    	if !p.lookupAndDeleteSupported {
    		return p.legacyLookupAndDeleteMap(met)
    	}
    
    
    	packetMap := p.objects.PacketRecord
    	iterator := packetMap.Iterate()
    	packets := make(map[int][]*byte, p.cacheMaxSize)
    	var id int
    
    
    	// First, get all ids and ignore content (we need lookup+delete to be atomic)
    
    		ids = append(ids, id)
    	}
    
    	// Run the atomic Lookup+Delete; if new ids have been inserted in the meantime, they'll be fetched next time
    	for i, id := range ids {
    		if err := packetMap.LookupAndDelete(&id, &packet); err != nil {
    			if i == 0 && errors.Is(err, ebpf.ErrNotSupported) {
    				log.WithError(err).Warnf("switching to legacy mode")
    				p.lookupAndDeleteSupported = false
    				return p.legacyLookupAndDeleteMap(met)
    			}
    			log.WithError(err).WithField("packetID", id).Warnf("couldn't delete entry")
    			met.Errors.WithErrorName("pkt-fetcher", "CannotDeleteEntry").Inc()