Skip to content
Snippets Groups Projects
Commit 0f9bc850 authored by Mohamed Mahmoud's avatar Mohamed Mahmoud
Browse files

bpfman needs to have ns string instead of the nshandle

parent b4e1a909
Branches
Tags
No related merge requests found
...@@ -36,9 +36,10 @@ type Event struct { ...@@ -36,9 +36,10 @@ type Event struct {
} }
type Interface struct { type Interface struct {
Name string Name string
Index int Index int
NetNS netns.NsHandle NetNS netns.NsHandle
NSName string
} }
// Informer provides notifications about each network interface that is added or removed // Informer provides notifications about each network interface that is added or removed
...@@ -48,7 +49,7 @@ type Informer interface { ...@@ -48,7 +49,7 @@ type Informer interface {
Subscribe(ctx context.Context) (<-chan Event, error) Subscribe(ctx context.Context) (<-chan Event, error)
} }
func netInterfaces(nsh netns.NsHandle) ([]Interface, error) { func netInterfaces(nsh netns.NsHandle, ns string) ([]Interface, error) {
handle, err := netlink.NewHandleAt(nsh) handle, err := netlink.NewHandleAt(nsh)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create handle for netns (%s): %w", nsh.String(), err) return nil, fmt.Errorf("failed to create handle for netns (%s): %w", nsh.String(), err)
...@@ -63,7 +64,7 @@ func netInterfaces(nsh netns.NsHandle) ([]Interface, error) { ...@@ -63,7 +64,7 @@ func netInterfaces(nsh netns.NsHandle) ([]Interface, error) {
names := make([]Interface, len(links)) names := make([]Interface, len(links))
for i, link := range links { for i, link := range links {
names[i] = Interface{Name: link.Attrs().Name, Index: link.Attrs().Index, NetNS: nsh} names[i] = Interface{Name: link.Attrs().Name, Index: link.Attrs().Index, NetNS: nsh, NSName: ns}
} }
return names, nil return names, nil
} }
...@@ -13,7 +13,7 @@ import ( ...@@ -13,7 +13,7 @@ import (
type Poller struct { type Poller struct {
period time.Duration period time.Duration
current map[Interface]struct{} current map[Interface]struct{}
interfaces func(handle netns.NsHandle) ([]Interface, error) interfaces func(handle netns.NsHandle, ns string) ([]Interface, error)
bufLen int bufLen int
} }
...@@ -58,7 +58,7 @@ func (np *Poller) pollForEvents(ctx context.Context, ns string, out chan Event) ...@@ -58,7 +58,7 @@ func (np *Poller) pollForEvents(ctx context.Context, ns string, out chan Event)
defer ticker.Stop() defer ticker.Stop()
for { for {
if ifaces, err := np.interfaces(netnsHandle); err != nil { if ifaces, err := np.interfaces(netnsHandle, ns); err != nil {
log.WithError(err).Warn("fetching interface names") log.WithError(err).Warn("fetching interface names")
} else { } else {
log.WithField("names", ifaces).Debug("fetched interface names") log.WithField("names", ifaces).Debug("fetched interface names")
......
...@@ -19,12 +19,12 @@ func TestPoller(t *testing.T) { ...@@ -19,12 +19,12 @@ func TestPoller(t *testing.T) {
// fake net.Interfaces implementation that returns two different sets of // fake net.Interfaces implementation that returns two different sets of
// interfaces on successive invocations // interfaces on successive invocations
firstInvocation := true firstInvocation := true
var fakeInterfaces = func(_ netns.NsHandle) ([]Interface, error) { var fakeInterfaces = func(_ netns.NsHandle, _ string) ([]Interface, error) {
if firstInvocation { if firstInvocation {
firstInvocation = false firstInvocation = false
return []Interface{{"foo", 1, netns.None()}, {"bar", 2, netns.None()}}, nil return []Interface{{"foo", 1, netns.None(), ""}, {"bar", 2, netns.None(), ""}}, nil
} }
return []Interface{{"foo", 1, netns.None()}, {"bae", 3, netns.None()}}, nil return []Interface{{"foo", 1, netns.None(), ""}, {"bae", 3, netns.None(), ""}}, nil
} }
poller := NewPoller(5*time.Millisecond, 10) poller := NewPoller(5*time.Millisecond, 10)
poller.interfaces = fakeInterfaces poller.interfaces = fakeInterfaces
...@@ -33,17 +33,17 @@ func TestPoller(t *testing.T) { ...@@ -33,17 +33,17 @@ func TestPoller(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// first poll: two interfaces are added // first poll: two interfaces are added
assert.Equal(t, assert.Equal(t,
Event{Type: EventAdded, Interface: Interface{"foo", 1, netns.None()}}, Event{Type: EventAdded, Interface: Interface{"foo", 1, netns.None(), ""}},
getEvent(t, updates, timeout)) getEvent(t, updates, timeout))
assert.Equal(t, assert.Equal(t,
Event{Type: EventAdded, Interface: Interface{"bar", 2, netns.None()}}, Event{Type: EventAdded, Interface: Interface{"bar", 2, netns.None(), ""}},
getEvent(t, updates, timeout)) getEvent(t, updates, timeout))
// second poll: one interface is added and another is removed // second poll: one interface is added and another is removed
assert.Equal(t, assert.Equal(t,
Event{Type: EventAdded, Interface: Interface{"bae", 3, netns.None()}}, Event{Type: EventAdded, Interface: Interface{"bae", 3, netns.None(), ""}},
getEvent(t, updates, timeout)) getEvent(t, updates, timeout))
assert.Equal(t, assert.Equal(t,
Event{Type: EventDeleted, Interface: Interface{"bar", 2, netns.None()}}, Event{Type: EventDeleted, Interface: Interface{"bar", 2, netns.None(), ""}},
getEvent(t, updates, timeout)) getEvent(t, updates, timeout))
// successive polls: no more events are forwarded // successive polls: no more events are forwarded
select { select {
......
...@@ -17,8 +17,8 @@ func TestRegisterer(t *testing.T) { ...@@ -17,8 +17,8 @@ func TestRegisterer(t *testing.T) {
watcher := NewWatcher(10) watcher := NewWatcher(10)
registry := NewRegisterer(watcher, 10) registry := NewRegisterer(watcher, 10)
// mock net.Interfaces and linkSubscriber to control which interfaces are discovered // mock net.Interfaces and linkSubscriber to control which interfaces are discovered
watcher.interfaces = func(_ netns.NsHandle) ([]Interface, error) { watcher.interfaces = func(_ netns.NsHandle, _ string) ([]Interface, error) {
return []Interface{{"foo", 1, netns.None()}, {"bar", 2, netns.None()}, {"baz", 3, netns.None()}}, nil return []Interface{{"foo", 1, netns.None(), ""}, {"bar", 2, netns.None(), ""}, {"baz", 3, netns.None(), ""}}, nil
} }
inputLinks := make(chan netlink.LinkUpdate, 10) inputLinks := make(chan netlink.LinkUpdate, 10)
watcher.linkSubscriberAt = func(_ netns.NsHandle, ch chan<- netlink.LinkUpdate, _ <-chan struct{}) error { watcher.linkSubscriberAt = func(_ netns.NsHandle, ch chan<- netlink.LinkUpdate, _ <-chan struct{}) error {
......
...@@ -27,7 +27,7 @@ var log = logrus.WithField("component", "ifaces.Watcher") ...@@ -27,7 +27,7 @@ var log = logrus.WithField("component", "ifaces.Watcher")
type Watcher struct { type Watcher struct {
bufLen int bufLen int
current map[Interface]struct{} current map[Interface]struct{}
interfaces func(handle netns.NsHandle) ([]Interface, error) interfaces func(handle netns.NsHandle, ns string) ([]Interface, error)
// linkSubscriber abstracts netlink.LinkSubscribe implementation, allowing the injection of // linkSubscriber abstracts netlink.LinkSubscribe implementation, allowing the injection of
// mocks for unit testing // mocks for unit testing
linkSubscriberAt func(ns netns.NsHandle, ch chan<- netlink.LinkUpdate, done <-chan struct{}) error linkSubscriberAt func(ns netns.NsHandle, ch chan<- netlink.LinkUpdate, done <-chan struct{}) error
...@@ -114,11 +114,11 @@ func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) { ...@@ -114,11 +114,11 @@ func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) {
// before sending netlink updates, send all the existing interfaces at the moment of starting // before sending netlink updates, send all the existing interfaces at the moment of starting
// the Watcher // the Watcher
if netnsHandle.IsOpen() || netnsHandle.Equal(netns.None()) { if netnsHandle.IsOpen() || netnsHandle.Equal(netns.None()) {
if names, err := w.interfaces(netnsHandle); err != nil { if names, err := w.interfaces(netnsHandle, ns); err != nil {
log.WithError(err).Error("can't fetch network interfaces. You might be missing flows") log.WithError(err).Error("can't fetch network interfaces. You might be missing flows")
} else { } else {
for _, name := range names { for _, name := range names {
iface := Interface{Name: name.Name, Index: name.Index, NetNS: netnsHandle} iface := Interface{Name: name.Name, Index: name.Index, NetNS: netnsHandle, NSName: ns}
w.mutex.Lock() w.mutex.Lock()
w.current[iface] = struct{}{} w.current[iface] = struct{}{}
w.mutex.Unlock() w.mutex.Unlock()
...@@ -133,7 +133,7 @@ func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) { ...@@ -133,7 +133,7 @@ func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) {
log.WithField("link", link).Debug("received link update without attributes. Ignoring") log.WithField("link", link).Debug("received link update without attributes. Ignoring")
continue continue
} }
iface := Interface{Name: attrs.Name, Index: attrs.Index, NetNS: netnsHandle} iface := Interface{Name: attrs.Name, Index: attrs.Index, NetNS: netnsHandle, NSName: ns}
w.mutex.Lock() w.mutex.Lock()
if link.Flags&(syscall.IFF_UP|syscall.IFF_RUNNING) != 0 && attrs.OperState == netlink.OperUp { if link.Flags&(syscall.IFF_UP|syscall.IFF_RUNNING) != 0 && attrs.OperState == netlink.OperUp {
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
......
...@@ -19,8 +19,8 @@ func TestWatcher(t *testing.T) { ...@@ -19,8 +19,8 @@ func TestWatcher(t *testing.T) {
watcher := NewWatcher(10) watcher := NewWatcher(10)
// mock net.Interfaces and linkSubscriber to control which interfaces are discovered // mock net.Interfaces and linkSubscriber to control which interfaces are discovered
watcher.interfaces = func(_ netns.NsHandle) ([]Interface, error) { watcher.interfaces = func(_ netns.NsHandle, _ string) ([]Interface, error) {
return []Interface{{"foo", 1, netns.None()}, {"bar", 2, netns.None()}, {"baz", 3, netns.None()}}, nil return []Interface{{"foo", 1, netns.None(), ""}, {"bar", 2, netns.None(), ""}, {"baz", 3, netns.None(), ""}}, nil
} }
inputLinks := make(chan netlink.LinkUpdate, 10) inputLinks := make(chan netlink.LinkUpdate, 10)
watcher.linkSubscriberAt = func(_ netns.NsHandle, ch chan<- netlink.LinkUpdate, _ <-chan struct{}) error { watcher.linkSubscriberAt = func(_ netns.NsHandle, ch chan<- netlink.LinkUpdate, _ <-chan struct{}) error {
...@@ -37,23 +37,23 @@ func TestWatcher(t *testing.T) { ...@@ -37,23 +37,23 @@ func TestWatcher(t *testing.T) {
// initial set of fetched elements // initial set of fetched elements
assert.Equal(t, assert.Equal(t,
Event{Type: EventAdded, Interface: Interface{"foo", 1, netns.None()}}, Event{Type: EventAdded, Interface: Interface{"foo", 1, netns.None(), ""}},
getEvent(t, outputEvents, timeout)) getEvent(t, outputEvents, timeout))
assert.Equal(t, assert.Equal(t,
Event{Type: EventAdded, Interface: Interface{"bar", 2, netns.None()}}, Event{Type: EventAdded, Interface: Interface{"bar", 2, netns.None(), ""}},
getEvent(t, outputEvents, timeout)) getEvent(t, outputEvents, timeout))
assert.Equal(t, assert.Equal(t,
Event{Type: EventAdded, Interface: Interface{"baz", 3, netns.None()}}, Event{Type: EventAdded, Interface: Interface{"baz", 3, netns.None(), ""}},
getEvent(t, outputEvents, timeout)) getEvent(t, outputEvents, timeout))
// updates // updates
inputLinks <- upAndRunning("bae", 4, netns.None()) inputLinks <- upAndRunning("bae", 4, netns.None())
inputLinks <- down("bar", 2, netns.None()) inputLinks <- down("bar", 2, netns.None())
assert.Equal(t, assert.Equal(t,
Event{Type: EventAdded, Interface: Interface{"bae", 4, netns.None()}}, Event{Type: EventAdded, Interface: Interface{"bae", 4, netns.None(), ""}},
getEvent(t, outputEvents, timeout)) getEvent(t, outputEvents, timeout))
assert.Equal(t, assert.Equal(t,
Event{Type: EventDeleted, Interface: Interface{"bar", 2, netns.None()}}, Event{Type: EventDeleted, Interface: Interface{"bar", 2, netns.None(), ""}},
getEvent(t, outputEvents, timeout)) getEvent(t, outputEvents, timeout))
// repeated updates that do not involve a change in the current track of interfaces // repeated updates that do not involve a change in the current track of interfaces
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment