diff --git a/controller/controller.go b/controller/controller.go index fbc888b9c1e19804fa7edcaa55127bfcb4527f06..b025135c9d12900b9792789717a962e53d8d7b9d 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -128,7 +128,7 @@ func initialize() error { return err } - c.networkElementWatcher = nucleus.NewNetworkElementWatcher(c.pndStore) + c.networkElementWatcher = nucleus.NewNetworkElementWatcher(c.pndStore, c.eventService) c.networkElementWatcher.SubToNetworkElements(config.GetGnmiSubscriptionPaths(), nil) err = ensureDefaultRoleExists() diff --git a/controller/event/event.go b/controller/event/event.go index 4f9aaef8db2bc5a702fac335c62d4bb44030acd4..8eb6cf8d558e580f58a59b31f9d448891380e548 100644 --- a/controller/event/event.go +++ b/controller/event/event.go @@ -7,6 +7,7 @@ type Event struct { ID uuid.UUID `json:"id,omitempty"` EntityID uuid.UUID `json:"entity_id,omitempty"` Type string `json:"type,omitempty"` + Paths []string `json:"paths,omitempty"` } const ( @@ -46,3 +47,13 @@ func NewUpdateEvent(entityID uuid.UUID) Event { Type: TypeUpdate, } } + +// NewMneUpdateEvent creates a new update event for managed network elements. +func NewMneUpdateEvent(entityID uuid.UUID, paths []string) Event { + return Event{ + ID: uuid.New(), + EntityID: entityID, + Type: TypeUpdate, + Paths: paths, + } +} diff --git a/controller/event/event_test.go b/controller/event/event_test.go index 501c6e713b0a39909b84b9095b541576770352e0..6e27bc082428a75965e8c65ed5921b09176c8281 100644 --- a/controller/event/event_test.go +++ b/controller/event/event_test.go @@ -120,3 +120,40 @@ func TestNewUpdateEvent(t *testing.T) { }) } } + +func TestNewMneUpdateEvent(t *testing.T) { + type args struct { + entityID uuid.UUID + } + tests := []struct { + name string + args args + want Event + }{ + { + name: "should create a new update event", + args: args{ + entityID: getTestEntityUUID(), + }, + want: Event{ + ID: uuid.New(), + EntityID: getTestEntityUUID(), + Type: TypeUpdate, + Paths: []string{"some/random/path"}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := NewMneUpdateEvent(tt.args.entityID, []string{"some/random/path"}) + + if !reflect.DeepEqual(got.EntityID, tt.want.EntityID) { + t.Errorf("NewMneUpdateEvent().EntityID = %v, want %v", got, tt.want) + } + + if !reflect.DeepEqual(got.Type, tt.want.Type) { + t.Errorf("NewMneUpdateEvent().Type = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/controller/nucleus/networkElementService.go b/controller/nucleus/networkElementService.go index c300bc6a947618832a3ef07d23dcbcb9ebf97e5f..b6c0979b3410d82b4821824d6564e2ca14e0abff 100644 --- a/controller/nucleus/networkElementService.go +++ b/controller/nucleus/networkElementService.go @@ -137,7 +137,8 @@ func (s *NetworkElementService) UpdateModel(networkElementToUpdate networkelemen return err } - pubEvent := event.NewUpdateEvent(networkElementToUpdate.ID()) + // TODO (faseid): check if we want to add the paths here instead of empty string array! + pubEvent := event.NewMneUpdateEvent(networkElementToUpdate.ID(), []string{}) if err := s.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil { go func() { s.eventService.Reconnect() @@ -159,7 +160,8 @@ func (s *NetworkElementService) Update(networkElementToUpdate networkelement.Net return err } - pubEvent := event.NewUpdateEvent(networkElementToUpdate.ID()) + // TODO (faseid): check if we want to add the paths here instead of empty string array! + pubEvent := event.NewMneUpdateEvent(networkElementToUpdate.ID(), []string{}) if err := s.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil { go func() { s.eventService.Reconnect() diff --git a/controller/nucleus/networkElementWatcher.go b/controller/nucleus/networkElementWatcher.go index d3d0cc20c66e5da30923ffaf2e9f8f476bac9e3a..5cb76b0a822fe766cc999b3b80d72e091e2f79ca 100644 --- a/controller/nucleus/networkElementWatcher.go +++ b/controller/nucleus/networkElementWatcher.go @@ -5,6 +5,8 @@ import ( "fmt" "code.fbi.h-da.de/danet/gosdn/controller/customerrs" + "code.fbi.h-da.de/danet/gosdn/controller/event" + eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport" @@ -28,6 +30,7 @@ const ( type NetworkElementWatcher struct { pndStore networkdomain.PndStore networkelementSubcriptions map[uuid.UUID]*networkelementSubscriptionHelper + eventService eventInterfaces.Service } // networkelementSubscriptionHelper is used to store information to stop a running subscribe go routine. @@ -37,10 +40,11 @@ type networkelementSubscriptionHelper struct { } // NewNetworkElementWatcher takes a pndStore to subscribe to network element paths. -func NewNetworkElementWatcher(pndStore networkdomain.PndStore) *NetworkElementWatcher { +func NewNetworkElementWatcher(pndStore networkdomain.PndStore, eventService eventInterfaces.Service) *NetworkElementWatcher { return &NetworkElementWatcher{ pndStore: pndStore, networkelementSubcriptions: make(map[uuid.UUID]*networkelementSubscriptionHelper), + eventService: eventService, } } @@ -160,9 +164,27 @@ func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.Subscrib err = mne.Transport().ProcessControlPlaneSubscribeResponse(resp, mne.GetModel(), mne.SBI().Schema()) if err != nil { log.Error(err) - } else { - if err := pnd.UpdateNetworkElementAfterSubscribeResponse(mne); err != nil { - log.Error(err) - } + } // else { + // if err := pnd.UpdateNetworkElementAfterSubscribeResponse(mne); err != nil { + // log.Error(err) + // } + // } + + paths := []string{} + + for _, update := range resp.Update.Update { + paths = append(paths, update.String()) + } + + pubEvent := event.NewMneUpdateEvent(mne.ID(), paths) + if err := n.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil { + go func() { + n.eventService.Reconnect() + + retryErr := n.eventService.RetryPublish(NetworkElementEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } }