Skip to content
Snippets Groups Projects
Commit d1058091 authored by Fabian Seidl's avatar Fabian Seidl
Browse files

Resolve "To improve the device watching mechanism a fetch all after time...

Resolve "To improve the device watching mechanism a fetch all after time interval method should be implemented"

See merge request !405
parent e74a39a1
Branches
Tags
2 merge requests!405Resolve "To improve the device watching mechanism a fetch all after time interval method should be implemented",!382Develop
Pipeline #125874 passed
......@@ -128,7 +128,7 @@ func initialize() error {
return err
}
c.networkElementWatcher = nucleus.NewNetworkElementWatcher(c.pndStore)
c.networkElementWatcher = nucleus.NewNetworkElementWatcher(c.pndStore, c.eventService)
c.networkElementWatcher.SubToNetworkElements(config.GetGnmiSubscriptionPaths(), nil)
err = ensureDefaultRoleExists()
......
......@@ -4,9 +4,10 @@ import "github.com/google/uuid"
// Event is a event that can be published via the event service as payload.
type Event struct {
ID uuid.UUID `json:"id,omitempty"`
EntityID uuid.UUID `json:"entity_id,omitempty"`
Type string `json:"type,omitempty"`
ID uuid.UUID `json:"id,omitempty"`
EntityID uuid.UUID `json:"entity_id,omitempty"`
Type string `json:"type,omitempty"`
PathsAndValuesMap map[string]string `json:"paths_and_values,omitempty"`
}
const (
......@@ -46,3 +47,13 @@ func NewUpdateEvent(entityID uuid.UUID) Event {
Type: TypeUpdate,
}
}
// NewMneUpdateEvent creates a new update event for managed network elements.
func NewMneUpdateEvent(entityID uuid.UUID, pathsAndValues map[string]string) Event {
return Event{
ID: uuid.New(),
EntityID: entityID,
Type: TypeUpdate,
PathsAndValuesMap: pathsAndValues,
}
}
......@@ -120,3 +120,46 @@ func TestNewUpdateEvent(t *testing.T) {
})
}
}
func TestNewMneUpdateEvent(t *testing.T) {
type args struct {
entityID uuid.UUID
pathsAndValuesMap map[string]string
}
tests := []struct {
name string
args args
want Event
}{
{
name: "should create a new update event",
args: args{
entityID: getTestEntityUUID(),
pathsAndValuesMap: map[string]string{"some/random/path": "val"},
},
want: Event{
ID: uuid.New(),
EntityID: getTestEntityUUID(),
Type: TypeUpdate,
PathsAndValuesMap: map[string]string{"some/random/path": "val"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := NewMneUpdateEvent(tt.args.entityID, tt.args.pathsAndValuesMap)
if !reflect.DeepEqual(got.EntityID, tt.want.EntityID) {
t.Errorf("NewMneUpdateEvent().EntityID = %v, want %v", got, tt.want)
}
if !reflect.DeepEqual(got.Type, tt.want.Type) {
t.Errorf("NewMneUpdateEvent().Type = %v, want %v", got, tt.want)
}
if !reflect.DeepEqual(got.PathsAndValuesMap, tt.want.PathsAndValuesMap) {
t.Errorf("NewMneUpdateEvent().PathsAndValuesMap = %v, want %v", got, tt.want)
}
})
}
}
......@@ -137,7 +137,8 @@ func (s *NetworkElementService) UpdateModel(networkElementToUpdate networkelemen
return err
}
pubEvent := event.NewUpdateEvent(networkElementToUpdate.ID())
// TODO (faseid): check if we want to add the paths with values here instead of empty map!
pubEvent := event.NewMneUpdateEvent(networkElementToUpdate.ID(), map[string]string{})
if err := s.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil {
go func() {
s.eventService.Reconnect()
......@@ -159,7 +160,8 @@ func (s *NetworkElementService) Update(networkElementToUpdate networkelement.Net
return err
}
pubEvent := event.NewUpdateEvent(networkElementToUpdate.ID())
// TODO (faseid): check if we want to add the paths with values here instead of empty map!
pubEvent := event.NewMneUpdateEvent(networkElementToUpdate.ID(), map[string]string{})
if err := s.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil {
go func() {
s.eventService.Reconnect()
......
......@@ -3,8 +3,11 @@ package nucleus
import (
"context"
"fmt"
"strings"
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/event"
eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
......@@ -28,6 +31,7 @@ const (
type NetworkElementWatcher struct {
pndStore networkdomain.PndStore
networkelementSubcriptions map[uuid.UUID]*networkelementSubscriptionHelper
eventService eventInterfaces.Service
}
// networkelementSubscriptionHelper is used to store information to stop a running subscribe go routine.
......@@ -37,10 +41,11 @@ type networkelementSubscriptionHelper struct {
}
// NewNetworkElementWatcher takes a pndStore to subscribe to network element paths.
func NewNetworkElementWatcher(pndStore networkdomain.PndStore) *NetworkElementWatcher {
func NewNetworkElementWatcher(pndStore networkdomain.PndStore, eventService eventInterfaces.Service) *NetworkElementWatcher {
return &NetworkElementWatcher{
pndStore: pndStore,
networkelementSubcriptions: make(map[uuid.UUID]*networkelementSubscriptionHelper),
eventService: eventService,
}
}
......@@ -63,11 +68,11 @@ func (n *NetworkElementWatcher) SubToNetworkElements(paths [][]string, opts *gnm
}
for _, pnd := range pnds {
n.subscribeToPndNetworkElements(pnd.ID().String(), pnd, opts)
n.subscribeToPndNetworkElements(pnd, opts)
}
}
func (n *NetworkElementWatcher) subscribeToPndNetworkElements(pndID string, pnd networkdomain.NetworkDomain, opts *gnmi.SubscribeOptions) {
func (n *NetworkElementWatcher) subscribeToPndNetworkElements(pnd networkdomain.NetworkDomain, opts *gnmi.SubscribeOptions) {
for _, mne := range pnd.NetworkElements() {
subID := uuid.New()
......@@ -76,7 +81,7 @@ func (n *NetworkElementWatcher) subscribeToPndNetworkElements(pndID string, pnd
stopSubscribeCtx: stopContext,
stopFunc: cancel,
})
go n.callSubscribe(stopContext, pndID, mne, opts)
go n.callSubscribe(stopContext, pnd.ID().String(), mne, opts)
}
}
......@@ -160,9 +165,32 @@ func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.Subscrib
err = mne.Transport().ProcessControlPlaneSubscribeResponse(resp, mne.GetModel(), mne.SBI().Schema())
if err != nil {
log.Error(err)
} else {
if err := pnd.UpdateNetworkElementAfterSubscribeResponse(mne); err != nil {
log.Error(err)
}
pathsAndValues := make(map[string]string, 0)
for _, update := range resp.Update.Update {
pathString := ""
// go through elem to build full path
for _, elem := range update.Path.Elem {
// remove unwanted parts of path string example: "name:\"system\"" -> "system"
filteredElem := elem.String()[strings.Index(elem.String(), ":\"")+2 : len(elem.String())-1]
pathString += filteredElem + "/"
}
pathsAndValues[pathString] = update.Val.GetStringVal()
}
pubEvent := event.NewMneUpdateEvent(mne.ID(), pathsAndValues)
if err := n.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil {
go func() {
n.eventService.Reconnect()
retryErr := n.eventService.RetryPublish(NetworkElementEventTopic, pubEvent)
if retryErr != nil {
log.Error(retryErr)
}
}()
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment