diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 145de5eaeaa8ae0528efbd43bb6bd9b45d1287a7..f631e20802462af2b767b9f9e8b4466384c6f220 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -9,6 +9,7 @@ workflow: - if: $CI_PIPELINE_SOURCE == 'push' stages: + - tools - build - test - analyze @@ -25,3 +26,4 @@ include: - local: "/.gitlab/ci/.integration-test-containerlab.yml" - local: "/.gitlab/ci/.integration-test.yml" - local: "/.gitlab/ci/.uml-autogen-ci.yml" + - local: "/.gitlab/ci/.renovate.yml" diff --git a/.gitlab/ci/.renovate.yml b/.gitlab/ci/.renovate.yml new file mode 100644 index 0000000000000000000000000000000000000000..ba873230bcbfb4741a682a90caf8225b0e9e54ed --- /dev/null +++ b/.gitlab/ci/.renovate.yml @@ -0,0 +1,12 @@ +renovate: + stage: tools + + image: renovate/renovate:32.127.0-slim@sha256:30824333e0978851f96ac8e58e7afa39aaf996de243f8bfcb1d7d6906b46488d + + variables: + LOG_LEVEL: debug + + only: + - schedules + script: + - renovate $RENOVATE_EXTRA_FLAGS diff --git a/config.js b/config.js new file mode 100644 index 0000000000000000000000000000000000000000..466b8287b4148fcc4878a114b05966a6a57f1232 --- /dev/null +++ b/config.js @@ -0,0 +1,24 @@ +Object.assign(process.env, { + GIT_AUTHOR_NAME: 'Renovate Bot', + GIT_AUTHOR_EMAIL: 'renovate@danet.fbi.h-da.de', + GIT_COMMITTER_NAME: 'Renovate Bot', + GIT_COMMITTER_EMAIL: 'renovate@danet.fbi.h-da.de', +}); + +module.exports = { + endpoint: process.env.CI_API_V4_URL, + hostRules: [ + { + baseUrl: 'https://registry.code.fbi.h-da.de', + username: '@project_10161_bot', + password: process.env.GITLAB_REGISTRY_TOKEN, + }, + ], + platform: 'gitlab', + username: '@project_10161_bot', + gitAuthor: 'Renovate Bot <renovate@danet.fbi.h-da.de>', + autodiscover: true, + prConcurrentLimit: 10, + commitMessagePrefix: '[renovate]', + labels: ['renovate'] +}; diff --git a/controller/controller.go b/controller/controller.go index aadd8206d5ba61390bae3896717990c389e11dd4..6ea0fb68c34168df90896266c35353811ef7196a 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -128,7 +128,7 @@ func initialize() error { } c.networkElementWatcher = nucleus.NewNetworkElementWatcher(c.pndStore) - c.networkElementWatcher.SubToDevices(config.GetGnmiSubscriptionPaths(), nil) + c.networkElementWatcher.SubToNetworkElements(config.GetGnmiSubscriptionPaths(), nil) err = ensureDefaultRoleExists() if err != nil { diff --git a/controller/customerrs/errors.go b/controller/customerrs/errors.go index d5197bd0ce948fd7a735ba8b4181c49f213a4ab6..9873f10b3ac2e5beba60331361c19fedc6881bdb 100644 --- a/controller/customerrs/errors.go +++ b/controller/customerrs/errors.go @@ -35,7 +35,7 @@ func (e *AlreadyExistsError) Error() string { // InvalidUUIDError implements the Error interface and is called if a UUID is not valid. type InvalidUUIDError struct { - DeviceName string + NetworkElementName string } func (e *InvalidUUIDError) Error() string { @@ -255,24 +255,24 @@ func (e AMQPMessageFailError) Error() string { // SubscribeResponseError implements the Error interface and is called if there is an issue during a ongoing // gNMI Subscription. type SubscribeResponseError struct { - PndID string - DeviceID string - DeviceName string - Err string + PndID string + NetworkElementID string + NetworkElementName string + Err string } func (e SubscribeResponseError) Error() string { - return fmt.Sprintf("Subscribe failed, PndID: %s, DeviceID: %s, DeviceName: %s, Internal error: %s", e.PndID, e.DeviceID, e.DeviceName, e.Err) + return fmt.Sprintf("Subscribe failed, PndID: %s, NetworkElementID: %s, NetworkElementName: %s, Internal error: %s", e.PndID, e.NetworkElementID, e.NetworkElementName, e.Err) } // SubscribeSyncResponseError implements the Error interface and is called if there is an issue syncing a // gNMI Subscription. type SubscribeSyncResponseError struct { - PndID string - DeviceID string - DeviceName string + PndID string + NetworkElementID string + NetworkElementName string } func (e SubscribeSyncResponseError) Error() string { - return fmt.Sprintf("Sync failed, PndID: %s, DeviceID: %s, DeviceName: %s", e.PndID, e.DeviceID, e.DeviceName) + return fmt.Sprintf("Sync failed, PndID: %s, NetworkElementID: %s, NetworkElementName: %s", e.PndID, e.NetworkElementID, e.NetworkElementName) } diff --git a/controller/eventService/Service.go b/controller/eventService/Service.go index 8710655152b4db340ebfbebc2dd51d5d032370e3..7c822fc8a6b54e0d4cc07a094f9647c8f4cbd28b 100644 --- a/controller/eventService/Service.go +++ b/controller/eventService/Service.go @@ -3,6 +3,7 @@ package eventservice import ( "context" "encoding/json" + "time" "code.fbi.h-da.de/danet/gosdn/controller/config" "code.fbi.h-da.de/danet/gosdn/controller/customerrs" @@ -11,13 +12,37 @@ import ( interfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event" amqp "github.com/rabbitmq/amqp091-go" + "github.com/sethvargo/go-retry" log "github.com/sirupsen/logrus" ) +const ( + // Duration to try reconnecting until cancelation. + reconnectDelay = 5 * time.Second + + // Duration to try re-initializing a channel. + reInitDelay = 2 * time.Second + + // Max number of tries to reconnect to Broker. + reconnectAttempts = 5 + + // Max number of tries to re-initialize a channel. As re-init is part of the reconnect process this multiplies + // with number of reconnectAttempts. + reInitAttempts = 5 + + // Max number of tries to republish a failed message. + rePublishAttempts = 5 +) + // EventService is used to setup a connection to a broker and publish events to topics. type EventService struct { - connection *amqp.Connection - channel *amqp.Channel + connectionAddr string + connection *amqp.Connection + channel *amqp.Channel + done chan bool + notifyConnClose chan *amqp.Error + notifyChanClose chan *amqp.Error + isReady bool } // NewEventService creates a new connection to the broker and opens a channel for later usage. @@ -27,20 +52,108 @@ func NewEventService() (interfaces.Service, error) { return NewMockEventService(), nil } - conn, err := amqp.Dial(amqpURIBuilder(config.AMQPPrefix, config.AMQPUser, config.AMQPPassword, config.AMQPHost, config.AMQPPort)) + addr := amqpURIBuilder(config.AMQPPrefix, config.AMQPUser, config.AMQPPassword, config.AMQPHost, config.AMQPPort) + + conn, err := connect(addr) + if err != nil { + return nil, err + } + + notifyConnClose := make(chan *amqp.Error) + conn.NotifyClose(notifyConnClose) + + ch, err := initChannel(conn) + if err != nil { + return nil, err + } + notifyChanClose := make(chan *amqp.Error) + ch.NotifyClose(notifyChanClose) + + return &EventService{ + connectionAddr: addr, + connection: conn, + channel: ch, + done: make(chan bool), + notifyConnClose: notifyConnClose, + notifyChanClose: notifyChanClose, + }, nil +} + +func connect(addr string) (*amqp.Connection, error) { + conn, err := amqp.Dial(addr) if err != nil { return nil, &customerrs.AMQPInitFailError{Action: "failed to connect to RabbitMQ", Err: err} } + return conn, nil +} + +// Reconnect attempts to setup a new connection to the RabbitMQ server after an disconnect. +func (e *EventService) Reconnect() { + i := 1 + for { + e.isReady = false + + conn, err := connect(e.connectionAddr) + + if err != nil { + select { + case <-e.done: + return + case <-time.After(reconnectDelay): + } + continue + } + + done := e.handleReInit(conn) + if done || i == reconnectAttempts { + break + } + i++ + } +} + +func (e *EventService) handleReInit(conn *amqp.Connection) bool { + i := 1 + for { + e.isReady = false + + ch, err := initChannel(conn) + if err != nil { + select { + case <-e.done: + return true + case <-time.After(reInitDelay): + i++ + } + if i == reInitAttempts { + return false + } + continue + } + + notifyConnClose := make(chan *amqp.Error) + conn.NotifyClose(notifyConnClose) + e.notifyConnClose = notifyConnClose + e.connection = conn + + notifyChanClose := make(chan *amqp.Error) + ch.NotifyClose(notifyChanClose) + e.notifyChanClose = notifyChanClose + e.channel = ch + e.isReady = true + + return true + } +} + +func initChannel(conn *amqp.Connection) (*amqp.Channel, error) { ch, err := conn.Channel() if err != nil { return nil, &customerrs.AMQPInitFailError{Action: "failed to open a channel", Err: err} } - return &EventService{ - connection: conn, - channel: ch, - }, nil + return ch, nil } // PublishEvent declares a queue and publishes events. @@ -79,6 +192,23 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error { return nil } +// RetryPublish is used to retry publishing an event after a failed attempt. +func (e *EventService) RetryPublish(topic string, event event.Event) error { + ctx := context.Background() + backOff := retry.NewFibonacci(time.Second) + + if err := retry.Do(ctx, retry.WithMaxRetries(rePublishAttempts, backOff), func(ctx context.Context) error { + if pubErr := e.PublishEvent(topic, event); pubErr != nil { + return retry.RetryableError(&customerrs.AMQPMessageFailError{Action: "retrying to publish message failed", Err: pubErr}) + } + return nil + }); err != nil { + return err + } + + return nil +} + // CloseConnection closes an exisiting connection. func (e *EventService) CloseConnection() { if err := e.connection.Close(); err != nil { diff --git a/controller/eventService/utils.go b/controller/eventService/utils.go index 891142fef2e7bf809daca4885fb58921d4e4f9e3..12bb8ef48eeae5a00a7989d07576b4ca3b522215 100644 --- a/controller/eventService/utils.go +++ b/controller/eventService/utils.go @@ -33,3 +33,15 @@ func (e *MockEventService) PublishEvent(topic string, event event.Event) error { // CloseConnection closes an exisiting connection. func (e *MockEventService) CloseConnection() { } + +// RetryPublish declares a queue and publishes events. +func (e *MockEventService) RetryPublish(topic string, event event.Event) error { + e.Queue[topic] = append(e.Queue[topic], event) + + return nil +} + +// Reconnect reconnects to the RabbitMQ server. +func (e *MockEventService) Reconnect() { + // do nothing +} diff --git a/controller/interfaces/event/service.go b/controller/interfaces/event/service.go index 73f3c97d2dc3a30e45804671fc6bbc827017d0e5..34b7e26fce234a6cca34277f4574d387fc70a2bf 100644 --- a/controller/interfaces/event/service.go +++ b/controller/interfaces/event/service.go @@ -6,4 +6,6 @@ import "code.fbi.h-da.de/danet/gosdn/controller/event" type Service interface { PublishEvent(topic string, event event.Event) error CloseConnection() + RetryPublish(topic string, event event.Event) error + Reconnect() } diff --git a/controller/interfaces/transport/transport.go b/controller/interfaces/transport/transport.go index c5b969a1b041d06e2a9f949cd888bd0c11a76a91..e5d1f807a95a9665cb33f6bbffed325f0611863c 100644 --- a/controller/interfaces/transport/transport.go +++ b/controller/interfaces/transport/transport.go @@ -32,8 +32,8 @@ type ( // for distinguishing from which network element the information is from, to stop subscriptions and // error handling. type SubscriptionInformation struct { - PndID string - DeviceID string - DeviceName string - StopContext context.Context + PndID string + NetworkElementID string + NetworkElementName string + StopContext context.Context } diff --git a/controller/mocks/Service.go b/controller/mocks/Service.go index 55386e1728dd94fc86e6124d703c385ef53e9182..de81b8be67e30949057a4e743a0e243f080b355b 100644 --- a/controller/mocks/Service.go +++ b/controller/mocks/Service.go @@ -32,6 +32,20 @@ func (_m *Service) PublishEvent(topic string, _a1 controllerevent.Event) error { return r0 } +// RetryPublish provides a mock function with given fields: topic, _a1 +func (_m *Service) RetryPublish(topic string, _a1 controllerevent.Event) error { + ret := _m.Called(topic, _a1) + + var r0 error + if rf, ok := ret.Get(0).(func(string, controllerevent.Event) error); ok { + r0 = rf(topic, _a1) + } else { + r0 = ret.Error(0) + } + + return r0 +} + type mockConstructorTestingTNewService interface { mock.TestingT Cleanup(func()) diff --git a/controller/nucleus/networkElementService.go b/controller/nucleus/networkElementService.go index 19e6a9b23dbde38612cd821908769affa6fead2a..b2af03f9ad0be11c3c63cfcc229a38eab318262b 100644 --- a/controller/nucleus/networkElementService.go +++ b/controller/nucleus/networkElementService.go @@ -98,8 +98,16 @@ func (s *NetworkElementService) Add(networkElementToAdd networkelement.NetworkEl return err } - if err := s.eventService.PublishEvent(NetworkElementEventTopic, event.NewAddEvent(networkElementToAdd.ID())); err != nil { - log.Error(err) + pubEvent := event.NewAddEvent(networkElementToAdd.ID()) + if err := s.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil { + go func() { + s.eventService.Reconnect() + + retryErr := s.eventService.RetryPublish(NetworkElementEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return nil @@ -129,8 +137,16 @@ func (s *NetworkElementService) UpdateModel(networkElementToUpdate networkelemen return err } - if err := s.eventService.PublishEvent(NetworkElementEventTopic, event.NewUpdateEvent(networkElementToUpdate.ID())); err != nil { - log.Error(err) + pubEvent := event.NewUpdateEvent(networkElementToUpdate.ID()) + if err := s.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil { + go func() { + s.eventService.Reconnect() + + retryErr := s.eventService.RetryPublish(NetworkElementEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return nil @@ -143,8 +159,16 @@ func (s *NetworkElementService) Update(networkElementToUpdate networkelement.Net return err } - if err := s.eventService.PublishEvent(NetworkElementEventTopic, event.NewUpdateEvent(networkElementToUpdate.ID())); err != nil { - log.Error(err) + pubEvent := event.NewUpdateEvent(networkElementToUpdate.ID()) + if err := s.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil { + go func() { + s.eventService.Reconnect() + + retryErr := s.eventService.RetryPublish(NetworkElementEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return nil @@ -163,8 +187,16 @@ func (s *NetworkElementService) Delete(networkElementToDelete networkelement.Net } } - if err := s.eventService.PublishEvent(NetworkElementEventTopic, event.NewDeleteEvent(networkElementToDelete.ID())); err != nil { - log.Error(err) + pubEvent := event.NewDeleteEvent(networkElementToDelete.ID()) + if err := s.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil { + go func() { + s.eventService.Reconnect() + + retryErr := s.eventService.RetryPublish(NetworkElementEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return nil diff --git a/controller/nucleus/networkElementWatcher.go b/controller/nucleus/networkElementWatcher.go index efbc97b3da1c84563ad78da31ed7559588565233..d3d0cc20c66e5da30923ffaf2e9f8f476bac9e3a 100644 --- a/controller/nucleus/networkElementWatcher.go +++ b/controller/nucleus/networkElementWatcher.go @@ -23,15 +23,15 @@ const ( gNMIStreamMode string = "on_change" ) -// NetworkElementWatcher is a component that subscribes to devices via gNMI from within the controller and handles +// NetworkElementWatcher is a component that subscribes to network elements via gNMI from within the controller and handles // responses by triggering the internal event process. type NetworkElementWatcher struct { - pndStore networkdomain.PndStore - deviceSubcriptions map[uuid.UUID]*deviceSubscriptionHelper + pndStore networkdomain.PndStore + networkelementSubcriptions map[uuid.UUID]*networkelementSubscriptionHelper } -// deviceSubscriptionHelper is used to store information to stop a running subscribe go routine. -type deviceSubscriptionHelper struct { +// networkelementSubscriptionHelper is used to store information to stop a running subscribe go routine. +type networkelementSubscriptionHelper struct { stopSubscribeCtx context.Context stopFunc context.CancelFunc } @@ -39,15 +39,15 @@ type deviceSubscriptionHelper struct { // NewNetworkElementWatcher takes a pndStore to subscribe to network element paths. func NewNetworkElementWatcher(pndStore networkdomain.PndStore) *NetworkElementWatcher { return &NetworkElementWatcher{ - pndStore: pndStore, - deviceSubcriptions: make(map[uuid.UUID]*deviceSubscriptionHelper), + pndStore: pndStore, + networkelementSubcriptions: make(map[uuid.UUID]*networkelementSubscriptionHelper), } } -// SubToDevices subscribes to every available network element in each network domain according to provided SubscribeOptions. +// SubToNetworkElements subscribes to every available network element in each network domain according to provided SubscribeOptions. // Paths should be provided in the following format [][]string{{"system", "config", "hostname"}} // SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second). -func (d *NetworkElementWatcher) SubToDevices(paths [][]string, opts *gnmi.SubscribeOptions) { +func (n *NetworkElementWatcher) SubToNetworkElements(paths [][]string, opts *gnmi.SubscribeOptions) { if opts == nil { opts = &gnmi.SubscribeOptions{ Mode: gNMISubscribeMode, @@ -57,102 +57,102 @@ func (d *NetworkElementWatcher) SubToDevices(paths [][]string, opts *gnmi.Subscr } } - pnds, err := d.pndStore.GetAll() + pnds, err := n.pndStore.GetAll() if err != nil { log.Error(err) } for _, pnd := range pnds { - d.subscribeToPndDevices(pnd.ID().String(), pnd, opts) + n.subscribeToPndNetworkElements(pnd.ID().String(), pnd, opts) } } -func (d *NetworkElementWatcher) subscribeToPndDevices(pndID string, pnd networkdomain.NetworkDomain, opts *gnmi.SubscribeOptions) { +func (n *NetworkElementWatcher) subscribeToPndNetworkElements(pndID string, pnd networkdomain.NetworkDomain, opts *gnmi.SubscribeOptions) { for _, mne := range pnd.NetworkElements() { subID := uuid.New() stopContext, cancel := context.WithCancel(context.Background()) - d.addToDeviceSubscriptions(subID, &deviceSubscriptionHelper{ + n.addToNetworkElementSubscriptions(subID, &networkelementSubscriptionHelper{ stopSubscribeCtx: stopContext, stopFunc: cancel, }) - go d.callSubscribe(stopContext, pndID, mne, opts) + go n.callSubscribe(stopContext, pndID, mne, opts) } } -func (d *NetworkElementWatcher) callSubscribe(stopContext context.Context, pndID string, mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) { +func (n *NetworkElementWatcher) callSubscribe(stopContext context.Context, pndID string, mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) { gNMIOptionsCtx := context.Background() gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts) // SubscriptionInformation contains pnd ID, network element ID and name to be used in the internal subscribe to check // from which network element a response was sent - if err := mne.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, d.handleSubscribeResponse, &transport.SubscriptionInformation{ - PndID: pndID, - DeviceID: mne.ID().String(), - DeviceName: mne.Name(), - StopContext: stopContext, + if err := mne.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, n.handleSubscribeResponse, &transport.SubscriptionInformation{ + PndID: pndID, + NetworkElementID: mne.ID().String(), + NetworkElementName: mne.Name(), + StopContext: stopContext, }); err != nil { log.Error(err) } } -func (d *NetworkElementWatcher) addToDeviceSubscriptions(subID uuid.UUID, devSub *deviceSubscriptionHelper) { +func (n *NetworkElementWatcher) addToNetworkElementSubscriptions(subID uuid.UUID, devSub *networkelementSubscriptionHelper) { //TODO: improve handling of subscriptions, like be able to expose to apps so specific subscriptions instead of only all can be stopped in the future - d.deviceSubcriptions[subID] = devSub + n.networkelementSubcriptions[subID] = devSub } -// StopAndRemoveAllDeviceSubscriptions stops and removes all the available running subscriptions. -func (d *NetworkElementWatcher) StopAndRemoveAllDeviceSubscriptions() { - for key := range d.deviceSubcriptions { - d.StopAndRemoveDeviceSubscription(key) +// StopAndRemoveAllNetworkElementSubscriptions stops and removes all the available running subscriptions. +func (n *NetworkElementWatcher) StopAndRemoveAllNetworkElementSubscriptions() { + for key := range n.networkelementSubcriptions { + n.StopAndRemoveNetworkElementSubscription(key) } } -// StopAndRemoveDeviceSubscription passes a subscription uuid to stop the running subscription go routing and removes the entry from the map +// StopAndRemoveNetworkElementSubscription passes a subscription uuid to stop the running subscription go routing and removes the entry from the map // of network element subscriptions. -func (d *NetworkElementWatcher) StopAndRemoveDeviceSubscription(subID uuid.UUID) { - d.deviceSubcriptions[subID].stopFunc() - delete(d.deviceSubcriptions, subID) +func (n *NetworkElementWatcher) StopAndRemoveNetworkElementSubscription(subID uuid.UUID) { + n.networkelementSubcriptions[subID].stopFunc() + delete(n.networkelementSubcriptions, subID) } // handleSubscribeResponse takes the subscribe response and additional information about the network element to distinguish // from which network element a subscribe response was sent including improved error handling. -func (d *NetworkElementWatcher) handleSubscribeResponse(resp *gpb.SubscribeResponse, subscriptionInfo *transport.SubscriptionInformation) { +func (n *NetworkElementWatcher) handleSubscribeResponse(resp *gpb.SubscribeResponse, subscriptionInfo *transport.SubscriptionInformation) { switch resp := resp.Response.(type) { case *gpb.SubscribeResponse_Error: log.Error(&customerrs.SubscribeResponseError{ - PndID: subscriptionInfo.PndID, - DeviceID: subscriptionInfo.DeviceID, - DeviceName: subscriptionInfo.DeviceName, - Err: fmt.Sprint("SubscribeResponse_Error"), + PndID: subscriptionInfo.PndID, + NetworkElementID: subscriptionInfo.NetworkElementID, + NetworkElementName: subscriptionInfo.NetworkElementName, + Err: fmt.Sprint("SubscribeResponse_Error"), }) case *gpb.SubscribeResponse_SyncResponse: if !resp.SyncResponse { log.Error(&customerrs.SubscribeSyncResponseError{ - PndID: subscriptionInfo.PndID, - DeviceID: subscriptionInfo.DeviceID, - DeviceName: subscriptionInfo.DeviceName, + PndID: subscriptionInfo.PndID, + NetworkElementID: subscriptionInfo.NetworkElementID, + NetworkElementName: subscriptionInfo.NetworkElementName, }) } case *gpb.SubscribeResponse_Update: - d.handleSubscribeResponseUpdate(resp, subscriptionInfo) + n.handleSubscribeResponseUpdate(resp, subscriptionInfo) default: log.Infof("Invalid SubscribeResponse, %v", resp) } } -func (d *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) { +func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) { pndID, err := uuid.Parse(subscriptionInfo.PndID) if err != nil { log.Error(err) } - pnd, err := d.pndStore.Get(store.Query{ID: pndID}) + pnd, err := n.pndStore.Get(store.Query{ID: pndID}) if err != nil { log.Error(err) } - mne, err := pnd.GetNetworkElement(subscriptionInfo.DeviceID) + mne, err := pnd.GetNetworkElement(subscriptionInfo.NetworkElementID) if err != nil { log.Error(err) } diff --git a/controller/nucleus/sbiService.go b/controller/nucleus/sbiService.go index 1b68b9017eba06fb9c59d7a4b27fd4ee52714ebf..fbf9ab5c6ddf162168936b7914f7cacefb42fd21 100644 --- a/controller/nucleus/sbiService.go +++ b/controller/nucleus/sbiService.go @@ -73,8 +73,16 @@ func (s *SbiService) Add(sbiToAdd southbound.SouthboundInterface) error { return err } - if err := s.eventService.PublishEvent(SbiEventTopic, event.NewAddEvent(sbiToAdd.ID())); err != nil { - log.Error(err) + pubEvent := event.NewAddEvent(sbiToAdd.ID()) + if err := s.eventService.PublishEvent(SbiEventTopic, pubEvent); err != nil { + go func() { + s.eventService.Reconnect() + + retryErr := s.eventService.RetryPublish(SbiEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return nil @@ -94,8 +102,16 @@ func (s *SbiService) Delete(sbiToDelete southbound.SouthboundInterface) error { } } - if err := s.eventService.PublishEvent(SbiEventTopic, event.NewDeleteEvent(sbiToDelete.ID())); err != nil { - log.Error(err) + pubEvent := event.NewDeleteEvent(sbiToDelete.ID()) + if err := s.eventService.PublishEvent(SbiEventTopic, pubEvent); err != nil { + go func() { + s.eventService.Reconnect() + + retryErr := s.eventService.RetryPublish(SbiEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return nil diff --git a/controller/rbac/rbacService.go b/controller/rbac/rbacService.go index a2b4a288ca6cabd9f72d0989575b9cf8aba9a54b..414ad1884b8fcfe43e6f5c16c6c6ddc3ff26d9da 100644 --- a/controller/rbac/rbacService.go +++ b/controller/rbac/rbacService.go @@ -40,8 +40,16 @@ func (s *UserService) Add(userToAdd rbac.User) error { return err } - if err := s.eventService.PublishEvent(UserEventTopic, event.NewAddEvent(userToAdd.ID())); err != nil { - log.Error(err) + pubEvent := event.NewAddEvent(userToAdd.ID()) + if err := s.eventService.PublishEvent(UserEventTopic, pubEvent); err != nil { + go func() { + s.eventService.Reconnect() + + retryErr := s.eventService.RetryPublish(UserEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return nil @@ -54,8 +62,16 @@ func (s *UserService) Delete(userToDelete rbac.User) error { return err } - if err := s.eventService.PublishEvent(UserEventTopic, event.NewDeleteEvent(userToDelete.ID())); err != nil { - log.Error(err) + pubEvent := event.NewDeleteEvent(userToDelete.ID()) + if err := s.eventService.PublishEvent(UserEventTopic, pubEvent); err != nil { + go func() { + s.eventService.Reconnect() + + retryErr := s.eventService.RetryPublish(UserEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return nil @@ -68,8 +84,16 @@ func (s *UserService) Update(userToUpdate rbac.User) error { return err } - if err := s.eventService.PublishEvent(UserEventTopic, event.NewUpdateEvent(userToUpdate.ID())); err != nil { - log.Error(err) + pubEvent := event.NewUpdateEvent(userToUpdate.ID()) + if err := s.eventService.PublishEvent(UserEventTopic, pubEvent); err != nil { + go func() { + s.eventService.Reconnect() + + retryErr := s.eventService.RetryPublish(UserEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return nil @@ -126,8 +150,16 @@ func (s *RoleService) Add(roleToAdd rbac.Role) error { return err } - if err := s.eventService.PublishEvent(RoleEventTopic, event.NewAddEvent(roleToAdd.ID())); err != nil { - log.Error(err) + pubEvent := event.NewAddEvent(roleToAdd.ID()) + if err := s.eventService.PublishEvent(RoleEventTopic, pubEvent); err != nil { + go func() { + s.eventService.Reconnect() + + retryErr := s.eventService.RetryPublish(RoleEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return nil @@ -140,10 +172,17 @@ func (s *RoleService) Delete(roleToDelete rbac.Role) error { return err } - if err := s.eventService.PublishEvent(RoleEventTopic, event.NewDeleteEvent(roleToDelete.ID())); err != nil { - log.Error(err) - } + pubEvent := event.NewDeleteEvent(roleToDelete.ID()) + if err := s.eventService.PublishEvent(RoleEventTopic, pubEvent); err != nil { + go func() { + s.eventService.Reconnect() + retryErr := s.eventService.RetryPublish(RoleEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() + } return nil } @@ -154,8 +193,16 @@ func (s *RoleService) Update(roleToUpdate rbac.Role) error { return err } - if err := s.eventService.PublishEvent(RoleEventTopic, event.NewUpdateEvent(roleToUpdate.ID())); err != nil { - log.Error(err) + pubEvent := event.NewUpdateEvent(roleToUpdate.ID()) + if err := s.eventService.PublishEvent(RoleEventTopic, pubEvent); err != nil { + go func() { + s.eventService.Reconnect() + + retryErr := s.eventService.RetryPublish(RoleEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return nil diff --git a/controller/store/utils.go b/controller/store/utils.go index 09746a91b72c123be7d8e8db14b426de9c2cf4b7..be2a8c5c610408ea7f1ab72fb9761ee303af9955 100644 --- a/controller/store/utils.go +++ b/controller/store/utils.go @@ -21,7 +21,7 @@ func FromString(id string) (uuid.UUID, error) { log.WithFields(log.Fields{ "identifier": id, }).Debug(err) - return uuid.Nil, &customerrs.InvalidUUIDError{DeviceName: id} + return uuid.Nil, &customerrs.InvalidUUIDError{NetworkElementName: id} } return idAsUUID, nil diff --git a/controller/topology/nodes/nodeService.go b/controller/topology/nodes/nodeService.go index 76e2de209bc13eafecaf5989178433a405ea531b..518626c15e1f524fe375661acb256baa129a41d7 100644 --- a/controller/topology/nodes/nodeService.go +++ b/controller/topology/nodes/nodeService.go @@ -60,8 +60,16 @@ func (n *NodeService) createNode(node Node) (Node, error) { return node, err } - if err := n.eventService.PublishEvent(NodeEventTopic, event.NewAddEvent(node.ID)); err != nil { - log.Error(err) + pubEvent := event.NewAddEvent(node.ID) + if err := n.eventService.PublishEvent(NodeEventTopic, pubEvent); err != nil { + go func() { + n.eventService.Reconnect() + + retryErr := n.eventService.RetryPublish(NodeEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return node, nil @@ -74,8 +82,16 @@ func (n *NodeService) Update(node Node) error { return err } - if err := n.eventService.PublishEvent(NodeEventTopic, event.NewUpdateEvent(node.ID)); err != nil { - log.Error(err) + pubEvent := event.NewUpdateEvent(node.ID) + if err := n.eventService.PublishEvent(NodeEventTopic, pubEvent); err != nil { + go func() { + n.eventService.Reconnect() + + retryErr := n.eventService.RetryPublish(NodeEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return nil @@ -88,8 +104,16 @@ func (n *NodeService) Delete(node Node) error { return err } - if err := n.eventService.PublishEvent(NodeEventTopic, event.NewDeleteEvent(node.ID)); err != nil { - log.Error(err) + pubEvent := event.NewDeleteEvent(node.ID) + if err := n.eventService.PublishEvent(NodeEventTopic, pubEvent); err != nil { + go func() { + n.eventService.Reconnect() + + retryErr := n.eventService.RetryPublish(NodeEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return nil diff --git a/controller/topology/ports/portService.go b/controller/topology/ports/portService.go index 1625155a0cb1f4578be24b70186fd36622b17290..25bfc09c6631dcb936449982a20322e266681b17 100644 --- a/controller/topology/ports/portService.go +++ b/controller/topology/ports/portService.go @@ -58,8 +58,16 @@ func (p *PortService) createPort(port Port) (Port, error) { return port, err } - if err := p.eventService.PublishEvent(PortEventTopic, event.NewAddEvent(port.ID)); err != nil { - log.Error(err) + pubEvent := event.NewAddEvent(port.ID) + if err := p.eventService.PublishEvent(PortEventTopic, pubEvent); err != nil { + go func() { + p.eventService.Reconnect() + + retryErr := p.eventService.RetryPublish(PortEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return port, nil @@ -72,8 +80,16 @@ func (p *PortService) Update(port Port) error { return err } - if err := p.eventService.PublishEvent(PortEventTopic, event.NewUpdateEvent(port.ID)); err != nil { - log.Error(err) + pubEvent := event.NewUpdateEvent(port.ID) + if err := p.eventService.PublishEvent(PortEventTopic, pubEvent); err != nil { + go func() { + p.eventService.Reconnect() + + retryErr := p.eventService.RetryPublish(PortEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return nil @@ -86,8 +102,16 @@ func (p *PortService) Delete(port Port) error { return err } - if err := p.eventService.PublishEvent(PortEventTopic, event.NewDeleteEvent(port.ID)); err != nil { - log.Error(err) + pubEvent := event.NewDeleteEvent(port.ID) + if err := p.eventService.PublishEvent(PortEventTopic, pubEvent); err != nil { + go func() { + p.eventService.Reconnect() + + retryErr := p.eventService.RetryPublish(PortEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return nil diff --git a/controller/topology/routing-tables/routingTableService.go b/controller/topology/routing-tables/routingTableService.go index eec4efb2fc4f7cfd9fd0f2c7fa01a5a2bff47955..2a1fc1348b56545cce10c527959fcb670c38a656 100644 --- a/controller/topology/routing-tables/routingTableService.go +++ b/controller/topology/routing-tables/routingTableService.go @@ -68,8 +68,16 @@ func (r *RoutingTableService) createRoutingTable(routingTable RoutingTable) (Rou return routingTable, err } - if err := r.eventService.PublishEvent(RoutingTableEventTopic, event.NewAddEvent(routingTable.ID)); err != nil { - log.Error(err) + pubEvent := event.NewAddEvent(routingTable.ID) + if err := r.eventService.PublishEvent(RoutingTableEventTopic, pubEvent); err != nil { + go func() { + r.eventService.Reconnect() + + retryErr := r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return routingTable, nil @@ -82,8 +90,16 @@ func (r *RoutingTableService) Update(routingTable RoutingTable) error { return err } - if err := r.eventService.PublishEvent(RoutingTableEventTopic, event.NewUpdateEvent(routingTable.ID)); err != nil { - log.Error(err) + pubEvent := event.NewUpdateEvent(routingTable.ID) + if err := r.eventService.PublishEvent(RoutingTableEventTopic, pubEvent); err != nil { + go func() { + r.eventService.Reconnect() + + retryErr := r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return nil @@ -96,8 +112,16 @@ func (r *RoutingTableService) Delete(routingTable RoutingTable) error { return err } - if err := r.eventService.PublishEvent(RoutingTableEventTopic, event.NewDeleteEvent(routingTable.ID)); err != nil { - log.Error(err) + pubEvent := event.NewDeleteEvent(routingTable.ID) + if err := r.eventService.PublishEvent(RoutingTableEventTopic, pubEvent); err != nil { + go func() { + r.eventService.Reconnect() + + retryErr := r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return nil diff --git a/controller/topology/topologyService.go b/controller/topology/topologyService.go index 6949108435b88caf15e0b7b6e65bc9c8453018a8..68530269302514decd0e15d262a6907d3ee99a77 100644 --- a/controller/topology/topologyService.go +++ b/controller/topology/topologyService.go @@ -74,8 +74,16 @@ func (t *service) AddLink(link links.Link) error { return err } - if err := t.eventService.PublishEvent(LinkEventTopic, event.NewAddEvent(link.ID)); err != nil { - log.Error(err) + pubEvent := event.NewAddEvent(link.ID) + if err := t.eventService.PublishEvent(LinkEventTopic, pubEvent); err != nil { + go func() { + t.eventService.Reconnect() + + retryErr := t.eventService.RetryPublish(LinkEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return nil @@ -88,8 +96,16 @@ func (t *service) UpdateLink(link links.Link) error { return err } - if err := t.eventService.PublishEvent(LinkEventTopic, event.NewUpdateEvent(link.ID)); err != nil { - log.Error(err) + pubEvent := event.NewUpdateEvent(link.ID) + if err := t.eventService.PublishEvent(LinkEventTopic, pubEvent); err != nil { + go func() { + t.eventService.Reconnect() + + retryErr := t.eventService.RetryPublish(LinkEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return nil @@ -104,8 +120,16 @@ func (t *service) DeleteLink(link links.Link) error { return err } - if err := t.eventService.PublishEvent(LinkEventTopic, event.NewDeleteEvent(link.ID)); err != nil { - log.Error(err) + pubEvent := event.NewDeleteEvent(link.ID) + if err := t.eventService.PublishEvent(LinkEventTopic, pubEvent); err != nil { + go func() { + t.eventService.Reconnect() + + retryErr := t.eventService.RetryPublish(LinkEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } + }() } return nil diff --git a/go.mod b/go.mod index a0ce26ead8eec05f5c1f2f0d4b2a6505bc84676b..ebd186120a587195b9340d514202208c5374b69c 100644 --- a/go.mod +++ b/go.mod @@ -77,6 +77,7 @@ require ( github.com/prometheus/procfs v0.8.0 // indirect github.com/rabbitmq/amqp091-go v1.4.0 github.com/rivo/uniseg v0.2.0 // indirect + github.com/sethvargo/go-retry v0.2.3 github.com/spf13/afero v1.9.2 // indirect github.com/spf13/cast v1.5.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect diff --git a/go.sum b/go.sum index a3118afe5814eae221be1e8e0af4eeff7ba7af70..58d7445567bb55cbf4f109bb8fb4486f93cc7f4c 100644 --- a/go.sum +++ b/go.sum @@ -862,6 +862,8 @@ github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ= github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/sethvargo/go-password v0.2.0 h1:BTDl4CC/gjf/axHMaDQtw507ogrXLci6XRiLc7i/UHI= github.com/sethvargo/go-password v0.2.0/go.mod h1:Ym4Mr9JXLBycr02MFuVQ/0JHidNetSgbzutTr3zsYXE= +github.com/sethvargo/go-retry v0.2.3 h1:oYlgvIvsju3jNbottWABtbnoLC+GDtLdBHxKWxQm/iU= +github.com/sethvargo/go-retry v0.2.3/go.mod h1:1afjQuvh7s4gflMObvjLPaWgluLLyhA1wmVZ6KLpICw= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.0.4-0.20170822132746-89742aefa4b2/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=