Skip to content
Snippets Groups Projects
Commit fe2dd176 authored by Fabian Seidl's avatar Fabian Seidl
Browse files

test new way of subscribing, send path info in event

parent 37ecfb8a
No related branches found
No related tags found
1 merge request!405Resolve "To improve the device watching mechanism a fetch all after time interval method should be implemented"
This commit is part of merge request !405. Comments created here will be created in the context of that merge request.
......@@ -128,7 +128,7 @@ func initialize() error {
return err
}
c.networkElementWatcher = nucleus.NewNetworkElementWatcher(c.pndStore)
c.networkElementWatcher = nucleus.NewNetworkElementWatcher(c.pndStore, c.eventService)
c.networkElementWatcher.SubToNetworkElements(config.GetGnmiSubscriptionPaths(), nil)
err = ensureDefaultRoleExists()
......
......@@ -7,6 +7,7 @@ type Event struct {
ID uuid.UUID `json:"id,omitempty"`
EntityID uuid.UUID `json:"entity_id,omitempty"`
Type string `json:"type,omitempty"`
Paths []string `json:"paths,omitempty"`
}
const (
......@@ -46,3 +47,13 @@ func NewUpdateEvent(entityID uuid.UUID) Event {
Type: TypeUpdate,
}
}
// NewMneUpdateEvent creates a new update event for managed network elements.
func NewMneUpdateEvent(entityID uuid.UUID, paths []string) Event {
return Event{
ID: uuid.New(),
EntityID: entityID,
Type: TypeUpdate,
Paths: paths,
}
}
......@@ -120,3 +120,40 @@ func TestNewUpdateEvent(t *testing.T) {
})
}
}
func TestNewMneUpdateEvent(t *testing.T) {
type args struct {
entityID uuid.UUID
}
tests := []struct {
name string
args args
want Event
}{
{
name: "should create a new update event",
args: args{
entityID: getTestEntityUUID(),
},
want: Event{
ID: uuid.New(),
EntityID: getTestEntityUUID(),
Type: TypeUpdate,
Paths: []string{"some/random/path"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := NewMneUpdateEvent(tt.args.entityID, []string{"some/random/path"})
if !reflect.DeepEqual(got.EntityID, tt.want.EntityID) {
t.Errorf("NewMneUpdateEvent().EntityID = %v, want %v", got, tt.want)
}
if !reflect.DeepEqual(got.Type, tt.want.Type) {
t.Errorf("NewMneUpdateEvent().Type = %v, want %v", got, tt.want)
}
})
}
}
......@@ -137,7 +137,8 @@ func (s *NetworkElementService) UpdateModel(networkElementToUpdate networkelemen
return err
}
pubEvent := event.NewUpdateEvent(networkElementToUpdate.ID())
// TODO (faseid): check if we want to add the paths here instead of empty string array!
pubEvent := event.NewMneUpdateEvent(networkElementToUpdate.ID(), []string{})
if err := s.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil {
go func() {
s.eventService.Reconnect()
......@@ -159,7 +160,8 @@ func (s *NetworkElementService) Update(networkElementToUpdate networkelement.Net
return err
}
pubEvent := event.NewUpdateEvent(networkElementToUpdate.ID())
// TODO (faseid): check if we want to add the paths here instead of empty string array!
pubEvent := event.NewMneUpdateEvent(networkElementToUpdate.ID(), []string{})
if err := s.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil {
go func() {
s.eventService.Reconnect()
......
......@@ -5,6 +5,8 @@ import (
"fmt"
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/event"
eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
......@@ -28,6 +30,7 @@ const (
type NetworkElementWatcher struct {
pndStore networkdomain.PndStore
networkelementSubcriptions map[uuid.UUID]*networkelementSubscriptionHelper
eventService eventInterfaces.Service
}
// networkelementSubscriptionHelper is used to store information to stop a running subscribe go routine.
......@@ -37,10 +40,11 @@ type networkelementSubscriptionHelper struct {
}
// NewNetworkElementWatcher takes a pndStore to subscribe to network element paths.
func NewNetworkElementWatcher(pndStore networkdomain.PndStore) *NetworkElementWatcher {
func NewNetworkElementWatcher(pndStore networkdomain.PndStore, eventService eventInterfaces.Service) *NetworkElementWatcher {
return &NetworkElementWatcher{
pndStore: pndStore,
networkelementSubcriptions: make(map[uuid.UUID]*networkelementSubscriptionHelper),
eventService: eventService,
}
}
......@@ -160,9 +164,27 @@ func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.Subscrib
err = mne.Transport().ProcessControlPlaneSubscribeResponse(resp, mne.GetModel(), mne.SBI().Schema())
if err != nil {
log.Error(err)
} else {
if err := pnd.UpdateNetworkElementAfterSubscribeResponse(mne); err != nil {
log.Error(err)
}
} // else {
// if err := pnd.UpdateNetworkElementAfterSubscribeResponse(mne); err != nil {
// log.Error(err)
// }
// }
paths := []string{}
for _, update := range resp.Update.Update {
paths = append(paths, update.String())
}
pubEvent := event.NewMneUpdateEvent(mne.ID(), paths)
if err := n.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil {
go func() {
n.eventService.Reconnect()
retryErr := n.eventService.RetryPublish(NetworkElementEventTopic, pubEvent)
if retryErr != nil {
log.Error(retryErr)
}
}()
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment