Skip to content
Snippets Groups Projects
watcher.go 6.7 KiB
Newer Older
  • Learn to ignore specific revisions
  • 	"github.com/fsnotify/fsnotify"
    
    	"github.com/sirupsen/logrus"
    	"github.com/vishvananda/netlink"
    
    	"github.com/vishvananda/netns"
    
    	"k8s.io/apimachinery/pkg/util/wait"
    
    )
    
    const (
    	netnsVolume = "/var/run/netns"
    
    )
    
    // Watcher uses system's netlink to get real-time information events about network interfaces'
    // addition or removal.
    type Watcher struct {
    	bufLen     int
    
    	current    map[Interface]struct{}
    
    	interfaces func(handle netns.NsHandle) ([]Interface, error)
    
    	// linkSubscriber abstracts netlink.LinkSubscribe implementation, allowing the injection of
    	// mocks for unit testing
    
    	linkSubscriberAt func(ns netns.NsHandle, ch chan<- netlink.LinkUpdate, done <-chan struct{}) error
    	mutex            *sync.Mutex
    	netnsWatcher     *fsnotify.Watcher
    
    }
    
    func NewWatcher(bufLen int) *Watcher {
    	return &Watcher{
    
    		bufLen:           bufLen,
    		current:          map[Interface]struct{}{},
    		interfaces:       netInterfaces,
    		linkSubscriberAt: netlink.LinkSubscribeAt,
    		mutex:            &sync.Mutex{},
    		netnsWatcher:     &fsnotify.Watcher{},
    
    	}
    }
    
    func (w *Watcher) Subscribe(ctx context.Context) (<-chan Event, error) {
    	out := make(chan Event, w.bufLen)
    
    		go w.sendUpdates(ctx, "", out)
    
    		}
    	}
    	// register to get notification when netns is created or deleted and register for link update for new netns
    	w.netnsNotify(ctx, out)
    
    func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) {
    	var netnsHandle netns.NsHandle
    	var err error
    
    	log := logrus.WithField("component", "ifaces.Watcher")
    
    	doneChan := w.nsDone[ns]
    	defer func() {
    		close(doneChan)
    		delete(w.nsDone, ns)
    	}()
    
    	// subscribe for interface events
    	links := make(chan netlink.LinkUpdate)
    
    	if err = wait.PollUntilContextTimeout(ctx, 50*time.Microsecond, time.Second, true, func(ctx context.Context) (done bool, err error) {
    		if ns == "" {
    			netnsHandle = netns.None()
    		} else {
    			if netnsHandle, err = netns.GetFromName(ns); err != nil {
    				return false, nil
    			}
    		}
    
    		if err = w.linkSubscriberAt(netnsHandle, links, doneChan); err != nil {
    			log.WithFields(logrus.Fields{
    				"netns":       ns,
    				"netnsHandle": netnsHandle.String(),
    				"error":       err,
    			}).Debug("linkSubscribe failed retry")
    
    			if err := netnsHandle.Close(); err != nil {
    				log.WithError(err).Warn("netnsHandle close failed")
    			}
    
    			return false, nil
    		}
    
    		log.WithFields(logrus.Fields{
    			"netns":       ns,
    			"netnsHandle": netnsHandle.String(),
    		}).Debug("linkSubscribe to receive links update")
    		return true, nil
    	}); err != nil {
    		log.WithError(err).Errorf("can't subscribe to links netns %s netnsHandle %s", ns, netnsHandle.String())
    
    		return
    	}
    
    	// before sending netlink updates, send all the existing interfaces at the moment of starting
    	// the Watcher
    
    	if netnsHandle.IsOpen() || netnsHandle.Equal(netns.None()) {
    		if names, err := w.interfaces(netnsHandle); err != nil {
    
    			log.WithError(err).Error("can't fetch network interfaces. You might be missing flows")
    		} else {
    			for _, name := range names {
    				iface := Interface{Name: name.Name, Index: name.Index, NetNS: netnsHandle}
    				w.mutex.Lock()
    				w.current[iface] = struct{}{}
    				w.mutex.Unlock()
    				out <- Event{Type: EventAdded, Interface: iface}
    			}
    
    	for link := range links {
    		attrs := link.Attrs()
    		if attrs == nil {
    			log.WithField("link", link).Debug("received link update without attributes. Ignoring")
    			continue
    		}
    
    		iface := Interface{Name: attrs.Name, Index: attrs.Index, NetNS: netnsHandle}
    		w.mutex.Lock()
    		if link.Flags&(syscall.IFF_UP|syscall.IFF_RUNNING) != 0 && attrs.OperState == netlink.OperUp {
    
    			log.WithFields(logrus.Fields{
    				"operstate": attrs.OperState,
    				"flags":     attrs.Flags,
    				"name":      attrs.Name,
    
    				"netns":     netnsHandle.String(),
    
    			if _, ok := w.current[iface]; !ok {
    				w.current[iface] = struct{}{}
    				out <- Event{Type: EventAdded, Interface: iface}
    
    			}
    		} else {
    			log.WithFields(logrus.Fields{
    				"operstate": attrs.OperState,
    				"flags":     attrs.Flags,
    				"name":      attrs.Name,
    
    				"netns":     netnsHandle.String(),
    
    			}).Debug("Interface down or not running")
    
    			if _, ok := w.current[iface]; ok {
    				delete(w.current, iface)
    				out <- Event{Type: EventDeleted, Interface: iface}
    
    func getNetNS() ([]string, error) {
    
    	log := logrus.WithField("component", "ifaces.Watcher")
    	files, err := os.ReadDir(netnsVolume)
    	if err != nil {
    
    		log.Warningf("can't detect any network-namespaces err: %v [Ignore if the agent privileged flag is not set]", err)
    
    		return nil, fmt.Errorf("failed to list network-namespaces: %w", err)
    	}
    
    
    	if len(files) == 0 {
    		log.WithField("netns", files).Debug("empty network-namespaces list")
    
    	}
    	for _, f := range files {
    		ns := f.Name()
    
    		log.WithFields(logrus.Fields{
    
    		}).Debug("Detected network-namespace")
    	}
    
    
    }
    
    func (w *Watcher) netnsNotify(ctx context.Context, out chan Event) {
    	var err error
    	log := logrus.WithField("component", "ifaces.Watcher")
    
    	w.netnsWatcher, err = fsnotify.NewWatcher()
    	if err != nil {
    		log.WithError(err).Error("can't subscribe fsnotify")
    		return
    	}
    	// Start a goroutine to handle netns events
    	go func() {
    		for {
    			select {
    			case event, ok := <-w.netnsWatcher.Events:
    				if !ok {
    					return
    				}
    				if event.Op&fsnotify.Create == fsnotify.Create {
    					ns := filepath.Base(event.Name)
    
    					log.WithField("netns", ns).Debug("netns create notification")
    					if _, ok := w.nsDone[ns]; ok {
    						log.WithField("netns", ns).Debug("netns channel already exists, delete it")
    						delete(w.nsDone, ns)
    					}
    					w.nsDone[ns] = make(chan struct{})
    
    				if event.Op&fsnotify.Remove == fsnotify.Remove {
    					ns := filepath.Base(event.Name)
    					log.WithField("netns", ns).Debug("netns delete notification")
    					if _, ok := w.nsDone[ns]; ok {
    						w.nsDone[ns] <- struct{}{}
    					} else {
    						log.WithField("netns", ns).Debug("netns delete but there is no channel to send events to")
    					}
    				}
    
    			case err, ok := <-w.netnsWatcher.Errors:
    				if !ok {
    					return
    				}
    				log.WithError(err).Error("netns watcher detected an error")
    			}
    		}
    	}()
    
    	err = w.netnsWatcher.Add(netnsVolume)
    	if err != nil {
    
    		log.Warningf("failed to add watcher to netns directory err: %v [Ignore if the agent privileged flag is not set]", err)