Skip to content
Snippets Groups Projects
Unverified Commit 0e1a1031 authored by Mohamed S. Mahmoud's avatar Mohamed S. Mahmoud Committed by GitHub
Browse files

NETOBSERV-1805: threads are leaking with continous adding and deleting pods (#426)

parent 48d5bc84
Branches
Tags
No related merge requests found
...@@ -31,6 +31,7 @@ type Watcher struct { ...@@ -31,6 +31,7 @@ type Watcher struct {
linkSubscriberAt func(ns netns.NsHandle, ch chan<- netlink.LinkUpdate, done <-chan struct{}) error linkSubscriberAt func(ns netns.NsHandle, ch chan<- netlink.LinkUpdate, done <-chan struct{}) error
mutex *sync.Mutex mutex *sync.Mutex
netnsWatcher *fsnotify.Watcher netnsWatcher *fsnotify.Watcher
nsDone map[string]chan struct{}
} }
func NewWatcher(bufLen int) *Watcher { func NewWatcher(bufLen int) *Watcher {
...@@ -41,17 +42,19 @@ func NewWatcher(bufLen int) *Watcher { ...@@ -41,17 +42,19 @@ func NewWatcher(bufLen int) *Watcher {
linkSubscriberAt: netlink.LinkSubscribeAt, linkSubscriberAt: netlink.LinkSubscribeAt,
mutex: &sync.Mutex{}, mutex: &sync.Mutex{},
netnsWatcher: &fsnotify.Watcher{}, netnsWatcher: &fsnotify.Watcher{},
nsDone: make(map[string]chan struct{}),
} }
} }
func (w *Watcher) Subscribe(ctx context.Context) (<-chan Event, error) { func (w *Watcher) Subscribe(ctx context.Context) (<-chan Event, error) {
out := make(chan Event, w.bufLen) out := make(chan Event, w.bufLen)
netns, err := getNetNS() netns, err := getNetNS()
if err != nil { if err != nil {
w.nsDone[""] = make(chan struct{})
go w.sendUpdates(ctx, "", out) go w.sendUpdates(ctx, "", out)
} else { } else {
for _, n := range netns { for _, n := range netns {
w.nsDone[n] = make(chan struct{})
go w.sendUpdates(ctx, n, out) go w.sendUpdates(ctx, n, out)
} }
} }
...@@ -64,9 +67,13 @@ func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) { ...@@ -64,9 +67,13 @@ func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) {
var netnsHandle netns.NsHandle var netnsHandle netns.NsHandle
var err error var err error
log := logrus.WithField("component", "ifaces.Watcher") log := logrus.WithField("component", "ifaces.Watcher")
doneChan := w.nsDone[ns]
defer func() {
close(doneChan)
delete(w.nsDone, ns)
}()
// subscribe for interface events // subscribe for interface events
links := make(chan netlink.LinkUpdate) links := make(chan netlink.LinkUpdate)
doneChan := make(chan struct{})
if err = wait.PollUntilContextTimeout(ctx, 50*time.Microsecond, time.Second, true, func(ctx context.Context) (done bool, err error) { if err = wait.PollUntilContextTimeout(ctx, 50*time.Microsecond, time.Second, true, func(ctx context.Context) (done bool, err error) {
if ns == "" { if ns == "" {
netnsHandle = netns.None() netnsHandle = netns.None()
...@@ -113,6 +120,7 @@ func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) { ...@@ -113,6 +120,7 @@ func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) {
} }
} }
} }
for link := range links { for link := range links {
attrs := link.Attrs() attrs := link.Attrs()
if attrs == nil { if attrs == nil {
...@@ -191,9 +199,23 @@ func (w *Watcher) netnsNotify(ctx context.Context, out chan Event) { ...@@ -191,9 +199,23 @@ func (w *Watcher) netnsNotify(ctx context.Context, out chan Event) {
} }
if event.Op&fsnotify.Create == fsnotify.Create { if event.Op&fsnotify.Create == fsnotify.Create {
ns := filepath.Base(event.Name) ns := filepath.Base(event.Name)
log.WithField("netns", ns).Debug("netns notification") log.WithField("netns", ns).Debug("netns create notification")
if _, ok := w.nsDone[ns]; ok {
log.WithField("netns", ns).Debug("netns channel already exists, delete it")
delete(w.nsDone, ns)
}
w.nsDone[ns] = make(chan struct{})
go w.sendUpdates(ctx, ns, out) go w.sendUpdates(ctx, ns, out)
} }
if event.Op&fsnotify.Remove == fsnotify.Remove {
ns := filepath.Base(event.Name)
log.WithField("netns", ns).Debug("netns delete notification")
if _, ok := w.nsDone[ns]; ok {
w.nsDone[ns] <- struct{}{}
} else {
log.WithField("netns", ns).Debug("netns delete but there is no channel to send events to")
}
}
case err, ok := <-w.netnsWatcher.Errors: case err, ok := <-w.netnsWatcher.Errors:
if !ok { if !ok {
return return
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment