diff --git a/controller/controller.go b/controller/controller.go index 029e684ece84729e9dcde9a672e72f5f7be71866..bf65b11b5fb2b7691635a8771061300e86130b7d 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -44,15 +44,15 @@ var coreOnce sync.Once // Core is the representation of the controller's core type Core struct { - pndStore networkdomain.PndStore - userService rbac.UserService - roleService rbac.RoleService - httpServer *http.Server - grpcServer *grpc.Server - nbi *nbi.NorthboundInterface - eventService eventInterfaces.Service - stopChan chan os.Signal - subToDataPlaneComp *nucleus.SubToDataPlane + pndStore networkdomain.PndStore + userService rbac.UserService + roleService rbac.RoleService + httpServer *http.Server + grpcServer *grpc.Server + nbi *nbi.NorthboundInterface + eventService eventInterfaces.Service + stopChan chan os.Signal + deviceWatcher *nucleus.DeviceWatcher csbiClient cpb.CsbiServiceClient } @@ -79,9 +79,9 @@ func initialize() error { stopChan: make(chan os.Signal, 1), } - c.subToDataPlaneComp = nucleus.NewSubToDataPlane(c.pndStore) + c.deviceWatcher = nucleus.NewDeviceWatcher(c.pndStore) //TODO: remove this call after testing! - c.subToDataPlaneComp.SubToDevices() + c.deviceWatcher.SubToDevices([][]string{{"system", "config", "hostname"}}) // Setting up signal capturing signal.Notify(c.stopChan, os.Interrupt, syscall.SIGTERM) diff --git a/controller/interfaces/transport/transport.go b/controller/interfaces/transport/transport.go index 51f6f3b1dd8b3d2d80e1560fe415d57ed980f941..a08b6b29b7bd6f3cb61ceb72175d26621cff4c02 100644 --- a/controller/interfaces/transport/transport.go +++ b/controller/interfaces/transport/transport.go @@ -21,5 +21,6 @@ type Transport interface { } type ( + // HandleSubscribeResponse is the callback function to handle subcription responses HandleSubscribeResponse func(*gpb.SubscribeResponse) ) diff --git a/controller/nucleus/deviceWatcher.go b/controller/nucleus/deviceWatcher.go new file mode 100644 index 0000000000000000000000000000000000000000..8eb0bb7a5f27500ed82c989cc23242e416e479bc --- /dev/null +++ b/controller/nucleus/deviceWatcher.go @@ -0,0 +1,70 @@ +package nucleus + +import ( + "context" + "fmt" + + "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device" + "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain" + "code.fbi.h-da.de/danet/gosdn/controller/nucleus/types" + "code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi" + gpb "github.com/openconfig/gnmi/proto/gnmi" + log "github.com/sirupsen/logrus" +) + +const ( + subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds + // TODO: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI + gNMISubscribeMode string = "stream" + gNMIStreamMode string = "sample" +) + +// DeviceWatcher is a component that subscribes to devices via gNMI from within the controller and handles +// responses by triggering the internal event process. +type DeviceWatcher struct { + pndStore networkdomain.PndStore +} + +// NewDeviceWatcher takes a pndStore to subscribe to device paths. +func NewDeviceWatcher(pndStore networkdomain.PndStore) *DeviceWatcher { + return &DeviceWatcher{ + pndStore: pndStore, + } +} + +// SubToDevices subscribes to every available device in each network domain with fixed gNMI subscription options (streaming in sample mode each second). +// Paths should be provided in the following format [][]string{{"system", "config", "hostname"}} +func (d *DeviceWatcher) SubToDevices(paths [][]string) { + opts := &gnmi.SubscribeOptions{ + Mode: gNMISubscribeMode, + StreamMode: gNMIStreamMode, + Paths: paths, + SampleInterval: subscribeSampleInterval, + } + + pnds, err := d.pndStore.GetAll() + if err != nil { + log.Error(err) + } + + for _, pnd := range pnds { + d.subscribeToPndDevices(pnd, opts) + } +} + +func (d *DeviceWatcher) subscribeToPndDevices(pnd networkdomain.NetworkDomain, opts *gnmi.SubscribeOptions) { + for _, device := range pnd.Devices() { + go d.callSubscribe(device, opts) + } +} + +func (d *DeviceWatcher) callSubscribe(device device.Device, opts *gnmi.SubscribeOptions) { + ctx := context.Background() + ctx = context.WithValue(ctx, types.CtxKeyOpts, opts) + + device.Transport().SubscribeInternal(ctx, handleSubscribeRepsonse) +} + +func handleSubscribeRepsonse(resp *gpb.SubscribeResponse) { + fmt.Printf("YEP HANDLER CALLED Type:%T ExampleMessage:%s\n", resp, resp.String()) +} diff --git a/controller/nucleus/gnmi_transport.go b/controller/nucleus/gnmi_transport.go index b250076bd2d7f4300dc97097294dd1ad61990f8f..f16673377a3643c5b2c967b9e72454ec8817c96c 100644 --- a/controller/nucleus/gnmi_transport.go +++ b/controller/nucleus/gnmi_transport.go @@ -202,6 +202,8 @@ func (g *Gnmi) Subscribe(ctx context.Context, params ...string) error { return g.subscribe(ctx) } +// SubscribeInternal is used to subscribe to devices from within the controller. gNMI SubscribeOptions need to be provided in the context, +// the callback function handles the responses received from the subscription. func (g *Gnmi) SubscribeInternal(ctx context.Context, subscribeCallbackFunc tpInterface.HandleSubscribeResponse) error { if g.client == nil { return &errors.ErrNilClient{} diff --git a/controller/nucleus/subToDataPlane.go b/controller/nucleus/subToDataPlane.go deleted file mode 100644 index a9f3c0ef7d0c76c800a6d8870651a9fb48467459..0000000000000000000000000000000000000000 --- a/controller/nucleus/subToDataPlane.go +++ /dev/null @@ -1,58 +0,0 @@ -package nucleus - -import ( - "context" - "fmt" - - "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device" - "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain" - "code.fbi.h-da.de/danet/gosdn/controller/nucleus/types" - "code.fbi.h-da.de/danet/gosdn/controller/store" - "code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi" - "github.com/google/uuid" - gpb "github.com/openconfig/gnmi/proto/gnmi" - log "github.com/sirupsen/logrus" -) - -//TODO: rename file and go struct! - -type SubToDataPlane struct { - pndStore networkdomain.PndStore -} - -func NewSubToDataPlane(pndStore networkdomain.PndStore) *SubToDataPlane { - return &SubToDataPlane{ - pndStore: pndStore, - } -} - -func (s *SubToDataPlane) SubToDevices() { - - //TODO: make this configurable and move away from arista stuff?! - opts := &gnmi.SubscribeOptions{ - Mode: "stream", - StreamMode: "sample", - Paths: [][]string{{"system", "config", "hostname"}}, - SampleInterval: 1000000000, - } - - pnd, err := s.pndStore.Get(store.Query{ID: uuid.MustParse("5f20f34b-cbd0-4511-9ddc-c50cf6a3b49d")}) - if err != nil { - log.Error(err) - } - - for _, d := range pnd.Devices() { - go s.callSubscribe(d, opts) - } -} - -func (s *SubToDataPlane) callSubscribe(device device.Device, opts *gnmi.SubscribeOptions) { - ctx := context.Background() - ctx = context.WithValue(ctx, types.CtxKeyOpts, opts) - - device.Transport().SubscribeInternal(ctx, handleSubscribeRepsonse) -} - -func handleSubscribeRepsonse(resp *gpb.SubscribeResponse) { - fmt.Printf("YEP HANDLER CALLED Type:%T ExampleMessage:%s\n", resp, resp.String()) -}