From 901160fb700db5af9fc6ad017f58602ad7393dbb Mon Sep 17 00:00:00 2001
From: Fabian Seidl <fabian.seidl@h-da.de>
Date: Thu, 18 Aug 2022 17:09:22 +0200
Subject: [PATCH] added retry for event publish

---
 controller/eventService/Service.go            |  22 ++++
 controller/eventService/utils.go              |   7 ++
 controller/interfaces/event/service.go        |   1 +
 controller/mocks/Service.go                   | 100 +++---------------
 controller/mocks/Store.go                     |  77 ++++++++------
 controller/nucleus/deviceService.go           |  28 +++--
 controller/nucleus/sbiService.go              |  14 ++-
 controller/rbac/rbacService.go                |  43 +++++---
 controller/topology/nodes/nodeService.go      |  21 ++--
 controller/topology/ports/portService.go      |  21 ++--
 .../routing-tables/routingTableService.go     |  21 ++--
 controller/topology/topologyService.go        |  21 ++--
 go.mod                                        |   1 +
 go.sum                                        |   6 +-
 14 files changed, 211 insertions(+), 172 deletions(-)

diff --git a/controller/eventService/Service.go b/controller/eventService/Service.go
index 871065515..e891e4ad9 100644
--- a/controller/eventService/Service.go
+++ b/controller/eventService/Service.go
@@ -3,6 +3,7 @@ package eventservice
 import (
 	"context"
 	"encoding/json"
+	"time"
 
 	"code.fbi.h-da.de/danet/gosdn/controller/config"
 	"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
@@ -11,6 +12,7 @@ import (
 	interfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
 
 	amqp "github.com/rabbitmq/amqp091-go"
+	"github.com/sethvargo/go-retry"
 	log "github.com/sirupsen/logrus"
 )
 
@@ -20,6 +22,10 @@ type EventService struct {
 	channel    *amqp.Channel
 }
 
+const (
+	rePublishAttempts = 5
+)
+
 // NewEventService creates a new connection to the broker and opens a channel for later usage.
 func NewEventService() (interfaces.Service, error) {
 	// TODO: This is an fugly hack to mitigate that some tests are trying to connect to an actual broker. (staester)
@@ -79,6 +85,22 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error {
 	return nil
 }
 
+func (e *EventService) RetryPublish(topic string, event event.Event) error {
+	ctx := context.Background()
+	backOff := retry.NewConstant(2 * time.Second)
+
+	if err := retry.Do(ctx, retry.WithMaxRetries(rePublishAttempts, backOff), func(ctx context.Context) error {
+		if pubErr := e.PublishEvent(topic, event); pubErr != nil {
+			return retry.RetryableError(&customerrs.AMQPMessageFailError{Action: "retrying to publish message failed", Err: pubErr})
+		}
+		return nil
+	}); err != nil {
+		return err
+	}
+
+	return nil
+}
+
 // CloseConnection closes an exisiting connection.
 func (e *EventService) CloseConnection() {
 	if err := e.connection.Close(); err != nil {
diff --git a/controller/eventService/utils.go b/controller/eventService/utils.go
index 891142fef..400018196 100644
--- a/controller/eventService/utils.go
+++ b/controller/eventService/utils.go
@@ -33,3 +33,10 @@ func (e *MockEventService) PublishEvent(topic string, event event.Event) error {
 // CloseConnection closes an exisiting connection.
 func (e *MockEventService) CloseConnection() {
 }
+
+// RetryPublish declares a queue and publishes events.
+func (e *MockEventService) RetryPublish(topic string, event event.Event) error {
+	e.Queue[topic] = append(e.Queue[topic], event)
+
+	return nil
+}
diff --git a/controller/interfaces/event/service.go b/controller/interfaces/event/service.go
index 73f3c97d2..41ec92361 100644
--- a/controller/interfaces/event/service.go
+++ b/controller/interfaces/event/service.go
@@ -6,4 +6,5 @@ import "code.fbi.h-da.de/danet/gosdn/controller/event"
 type Service interface {
 	PublishEvent(topic string, event event.Event) error
 	CloseConnection()
+	RetryPublish(topic string, event event.Event) error
 }
diff --git a/controller/mocks/Service.go b/controller/mocks/Service.go
index 188cd0c9c..de81b8be6 100644
--- a/controller/mocks/Service.go
+++ b/controller/mocks/Service.go
@@ -3,10 +3,9 @@
 package mocks
 
 import (
-	device "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
-	mock "github.com/stretchr/testify/mock"
+	controllerevent "code.fbi.h-da.de/danet/gosdn/controller/event"
 
-	store "code.fbi.h-da.de/danet/gosdn/controller/store"
+	mock "github.com/stretchr/testify/mock"
 )
 
 // Service is an autogenerated mock type for the Service type
@@ -14,87 +13,18 @@ type Service struct {
 	mock.Mock
 }
 
-// Add provides a mock function with given fields: _a0
-func (_m *Service) Add(_a0 device.Device) error {
-	ret := _m.Called(_a0)
-
-	var r0 error
-	if rf, ok := ret.Get(0).(func(device.Device) error); ok {
-		r0 = rf(_a0)
-	} else {
-		r0 = ret.Error(0)
-	}
-
-	return r0
-}
-
-// Delete provides a mock function with given fields: _a0
-func (_m *Service) Delete(_a0 device.Device) error {
-	ret := _m.Called(_a0)
-
-	var r0 error
-	if rf, ok := ret.Get(0).(func(device.Device) error); ok {
-		r0 = rf(_a0)
-	} else {
-		r0 = ret.Error(0)
-	}
-
-	return r0
-}
-
-// Get provides a mock function with given fields: _a0
-func (_m *Service) Get(_a0 store.Query) (device.Device, error) {
-	ret := _m.Called(_a0)
-
-	var r0 device.Device
-	if rf, ok := ret.Get(0).(func(store.Query) device.Device); ok {
-		r0 = rf(_a0)
-	} else {
-		if ret.Get(0) != nil {
-			r0 = ret.Get(0).(device.Device)
-		}
-	}
-
-	var r1 error
-	if rf, ok := ret.Get(1).(func(store.Query) error); ok {
-		r1 = rf(_a0)
-	} else {
-		r1 = ret.Error(1)
-	}
-
-	return r0, r1
-}
-
-// GetAll provides a mock function with given fields:
-func (_m *Service) GetAll() ([]device.Device, error) {
-	ret := _m.Called()
-
-	var r0 []device.Device
-	if rf, ok := ret.Get(0).(func() []device.Device); ok {
-		r0 = rf()
-	} else {
-		if ret.Get(0) != nil {
-			r0 = ret.Get(0).([]device.Device)
-		}
-	}
-
-	var r1 error
-	if rf, ok := ret.Get(1).(func() error); ok {
-		r1 = rf()
-	} else {
-		r1 = ret.Error(1)
-	}
-
-	return r0, r1
+// CloseConnection provides a mock function with given fields:
+func (_m *Service) CloseConnection() {
+	_m.Called()
 }
 
-// Update provides a mock function with given fields: _a0
-func (_m *Service) Update(_a0 device.Device) error {
-	ret := _m.Called(_a0)
+// PublishEvent provides a mock function with given fields: topic, _a1
+func (_m *Service) PublishEvent(topic string, _a1 controllerevent.Event) error {
+	ret := _m.Called(topic, _a1)
 
 	var r0 error
-	if rf, ok := ret.Get(0).(func(device.Device) error); ok {
-		r0 = rf(_a0)
+	if rf, ok := ret.Get(0).(func(string, controllerevent.Event) error); ok {
+		r0 = rf(topic, _a1)
 	} else {
 		r0 = ret.Error(0)
 	}
@@ -102,13 +32,13 @@ func (_m *Service) Update(_a0 device.Device) error {
 	return r0
 }
 
-// UpdateModel provides a mock function with given fields: _a0, _a1
-func (_m *Service) UpdateModel(_a0 device.Device, _a1 string) error {
-	ret := _m.Called(_a0, _a1)
+// RetryPublish provides a mock function with given fields: topic, _a1
+func (_m *Service) RetryPublish(topic string, _a1 controllerevent.Event) error {
+	ret := _m.Called(topic, _a1)
 
 	var r0 error
-	if rf, ok := ret.Get(0).(func(device.Device, string) error); ok {
-		r0 = rf(_a0, _a1)
+	if rf, ok := ret.Get(0).(func(string, controllerevent.Event) error); ok {
+		r0 = rf(topic, _a1)
 	} else {
 		r0 = ret.Error(0)
 	}
diff --git a/controller/mocks/Store.go b/controller/mocks/Store.go
index f8f71cd7d..6ce0fd985 100644
--- a/controller/mocks/Store.go
+++ b/controller/mocks/Store.go
@@ -3,8 +3,8 @@
 package mocks
 
 import (
-	southbound "code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
-	store "code.fbi.h-da.de/danet/gosdn/controller/store"
+	store "code.fbi.h-da.de/danet/gosdn/controller/interfaces/store"
+	uuid "github.com/google/uuid"
 	mock "github.com/stretchr/testify/mock"
 )
 
@@ -13,13 +13,13 @@ type Store struct {
 	mock.Mock
 }
 
-// Add provides a mock function with given fields: _a0
-func (_m *Store) Add(_a0 southbound.SouthboundInterface) error {
-	ret := _m.Called(_a0)
+// Add provides a mock function with given fields: item
+func (_m *Store) Add(item store.Storable) error {
+	ret := _m.Called(item)
 
 	var r0 error
-	if rf, ok := ret.Get(0).(func(southbound.SouthboundInterface) error); ok {
-		r0 = rf(_a0)
+	if rf, ok := ret.Get(0).(func(store.Storable) error); ok {
+		r0 = rf(item)
 	} else {
 		r0 = ret.Error(0)
 	}
@@ -27,13 +27,13 @@ func (_m *Store) Add(_a0 southbound.SouthboundInterface) error {
 	return r0
 }
 
-// Delete provides a mock function with given fields: _a0
-func (_m *Store) Delete(_a0 southbound.SouthboundInterface) error {
-	ret := _m.Called(_a0)
+// Delete provides a mock function with given fields: id
+func (_m *Store) Delete(id uuid.UUID) error {
+	ret := _m.Called(id)
 
 	var r0 error
-	if rf, ok := ret.Get(0).(func(southbound.SouthboundInterface) error); ok {
-		r0 = rf(_a0)
+	if rf, ok := ret.Get(0).(func(uuid.UUID) error); ok {
+		r0 = rf(id)
 	} else {
 		r0 = ret.Error(0)
 	}
@@ -41,20 +41,36 @@ func (_m *Store) Delete(_a0 southbound.SouthboundInterface) error {
 	return r0
 }
 
-// Get provides a mock function with given fields: _a0
-func (_m *Store) Get(_a0 store.Query) (southbound.LoadedSbi, error) {
-	ret := _m.Called(_a0)
+// Exists provides a mock function with given fields: id
+func (_m *Store) Exists(id uuid.UUID) bool {
+	ret := _m.Called(id)
 
-	var r0 southbound.LoadedSbi
-	if rf, ok := ret.Get(0).(func(store.Query) southbound.LoadedSbi); ok {
-		r0 = rf(_a0)
+	var r0 bool
+	if rf, ok := ret.Get(0).(func(uuid.UUID) bool); ok {
+		r0 = rf(id)
 	} else {
-		r0 = ret.Get(0).(southbound.LoadedSbi)
+		r0 = ret.Get(0).(bool)
+	}
+
+	return r0
+}
+
+// Get provides a mock function with given fields: id
+func (_m *Store) Get(id uuid.UUID) (store.Storable, error) {
+	ret := _m.Called(id)
+
+	var r0 store.Storable
+	if rf, ok := ret.Get(0).(func(uuid.UUID) store.Storable); ok {
+		r0 = rf(id)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(store.Storable)
+		}
 	}
 
 	var r1 error
-	if rf, ok := ret.Get(1).(func(store.Query) error); ok {
-		r1 = rf(_a0)
+	if rf, ok := ret.Get(1).(func(uuid.UUID) error); ok {
+		r1 = rf(id)
 	} else {
 		r1 = ret.Error(1)
 	}
@@ -62,27 +78,20 @@ func (_m *Store) Get(_a0 store.Query) (southbound.LoadedSbi, error) {
 	return r0, r1
 }
 
-// GetAll provides a mock function with given fields:
-func (_m *Store) GetAll() ([]southbound.LoadedSbi, error) {
+// UUIDs provides a mock function with given fields:
+func (_m *Store) UUIDs() []uuid.UUID {
 	ret := _m.Called()
 
-	var r0 []southbound.LoadedSbi
-	if rf, ok := ret.Get(0).(func() []southbound.LoadedSbi); ok {
+	var r0 []uuid.UUID
+	if rf, ok := ret.Get(0).(func() []uuid.UUID); ok {
 		r0 = rf()
 	} else {
 		if ret.Get(0) != nil {
-			r0 = ret.Get(0).([]southbound.LoadedSbi)
+			r0 = ret.Get(0).([]uuid.UUID)
 		}
 	}
 
-	var r1 error
-	if rf, ok := ret.Get(1).(func() error); ok {
-		r1 = rf()
-	} else {
-		r1 = ret.Error(1)
-	}
-
-	return r0, r1
+	return r0
 }
 
 type mockConstructorTestingTNewStore interface {
diff --git a/controller/nucleus/deviceService.go b/controller/nucleus/deviceService.go
index 7e2e00c8a..c96496f39 100644
--- a/controller/nucleus/deviceService.go
+++ b/controller/nucleus/deviceService.go
@@ -80,8 +80,11 @@ func (s *DeviceService) Add(deviceToAdd device.Device) error {
 		return err
 	}
 
-	if err := s.eventService.PublishEvent(DeviceEventTopic, event.NewAddEvent(deviceToAdd.ID())); err != nil {
-		log.Error(err)
+	pubEvent := event.NewAddEvent(deviceToAdd.ID())
+	if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil {
+		if retryErr := s.eventService.RetryPublish(DeviceEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return nil
@@ -111,8 +114,11 @@ func (s *DeviceService) UpdateModel(deviceToUpdate device.Device, modelAsString
 		return err
 	}
 
-	if err := s.eventService.PublishEvent(DeviceEventTopic, event.NewUpdateEvent(deviceToUpdate.ID())); err != nil {
-		log.Error(err)
+	pubEvent := event.NewUpdateEvent(deviceToUpdate.ID())
+	if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil {
+		if retryErr := s.eventService.RetryPublish(DeviceEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return nil
@@ -125,8 +131,11 @@ func (s *DeviceService) Update(deviceToUpdate device.Device) error {
 		return err
 	}
 
-	if err := s.eventService.PublishEvent(DeviceEventTopic, event.NewUpdateEvent(deviceToUpdate.ID())); err != nil {
-		log.Error(err)
+	pubEvent := event.NewUpdateEvent(deviceToUpdate.ID())
+	if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil {
+		if retryErr := s.eventService.RetryPublish(DeviceEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return nil
@@ -145,8 +154,11 @@ func (s *DeviceService) Delete(deviceToDelete device.Device) error {
 		}
 	}
 
-	if err := s.eventService.PublishEvent(DeviceEventTopic, event.NewDeleteEvent(deviceToDelete.ID())); err != nil {
-		log.Error(err)
+	pubEvent := event.NewDeleteEvent(deviceToDelete.ID())
+	if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil {
+		if retryErr := s.eventService.RetryPublish(DeviceEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return nil
diff --git a/controller/nucleus/sbiService.go b/controller/nucleus/sbiService.go
index 8c1902431..50d4f4e77 100644
--- a/controller/nucleus/sbiService.go
+++ b/controller/nucleus/sbiService.go
@@ -73,8 +73,11 @@ func (s *SbiService) Add(sbiToAdd southbound.SouthboundInterface) error {
 		return err
 	}
 
-	if err := s.eventService.PublishEvent(SbiEventTopic, event.NewAddEvent(sbiToAdd.ID())); err != nil {
-		log.Error(err)
+	pubEvent := event.NewAddEvent(sbiToAdd.ID())
+	if err := s.eventService.PublishEvent(SbiEventTopic, pubEvent); err != nil {
+		if retryErr := s.eventService.RetryPublish(SbiEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return nil
@@ -94,8 +97,11 @@ func (s *SbiService) Delete(sbiToDelete southbound.SouthboundInterface) error {
 		}
 	}
 
-	if err := s.eventService.PublishEvent(SbiEventTopic, event.NewDeleteEvent(sbiToDelete.ID())); err != nil {
-		log.Error(err)
+	pubEvent := event.NewDeleteEvent(sbiToDelete.ID())
+	if err := s.eventService.PublishEvent(SbiEventTopic, pubEvent); err != nil {
+		if retryErr := s.eventService.RetryPublish(SbiEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return nil
diff --git a/controller/rbac/rbacService.go b/controller/rbac/rbacService.go
index a2b4a288c..37499c35b 100644
--- a/controller/rbac/rbacService.go
+++ b/controller/rbac/rbacService.go
@@ -40,8 +40,11 @@ func (s *UserService) Add(userToAdd rbac.User) error {
 		return err
 	}
 
-	if err := s.eventService.PublishEvent(UserEventTopic, event.NewAddEvent(userToAdd.ID())); err != nil {
-		log.Error(err)
+	pubEvent := event.NewAddEvent(userToAdd.ID())
+	if err := s.eventService.PublishEvent(UserEventTopic, pubEvent); err != nil {
+		if retryErr := s.eventService.RetryPublish(UserEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return nil
@@ -54,8 +57,11 @@ func (s *UserService) Delete(userToDelete rbac.User) error {
 		return err
 	}
 
-	if err := s.eventService.PublishEvent(UserEventTopic, event.NewDeleteEvent(userToDelete.ID())); err != nil {
-		log.Error(err)
+	pubEvent := event.NewDeleteEvent(userToDelete.ID())
+	if err := s.eventService.PublishEvent(UserEventTopic, pubEvent); err != nil {
+		if retryErr := s.eventService.RetryPublish(UserEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return nil
@@ -68,8 +74,11 @@ func (s *UserService) Update(userToUpdate rbac.User) error {
 		return err
 	}
 
-	if err := s.eventService.PublishEvent(UserEventTopic, event.NewUpdateEvent(userToUpdate.ID())); err != nil {
-		log.Error(err)
+	pubEvent := event.NewUpdateEvent(userToUpdate.ID())
+	if err := s.eventService.PublishEvent(UserEventTopic, pubEvent); err != nil {
+		if retryErr := s.eventService.RetryPublish(UserEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return nil
@@ -126,8 +135,11 @@ func (s *RoleService) Add(roleToAdd rbac.Role) error {
 		return err
 	}
 
-	if err := s.eventService.PublishEvent(RoleEventTopic, event.NewAddEvent(roleToAdd.ID())); err != nil {
-		log.Error(err)
+	pubEvent := event.NewAddEvent(roleToAdd.ID())
+	if err := s.eventService.PublishEvent(RoleEventTopic, pubEvent); err != nil {
+		if retryErr := s.eventService.RetryPublish(RoleEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return nil
@@ -140,10 +152,12 @@ func (s *RoleService) Delete(roleToDelete rbac.Role) error {
 		return err
 	}
 
-	if err := s.eventService.PublishEvent(RoleEventTopic, event.NewDeleteEvent(roleToDelete.ID())); err != nil {
-		log.Error(err)
+	pubEvent := event.NewDeleteEvent(roleToDelete.ID())
+	if err := s.eventService.PublishEvent(RoleEventTopic, pubEvent); err != nil {
+		if retryErr := s.eventService.RetryPublish(RoleEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
-
 	return nil
 }
 
@@ -154,8 +168,11 @@ func (s *RoleService) Update(roleToUpdate rbac.Role) error {
 		return err
 	}
 
-	if err := s.eventService.PublishEvent(RoleEventTopic, event.NewUpdateEvent(roleToUpdate.ID())); err != nil {
-		log.Error(err)
+	pubEvent := event.NewUpdateEvent(roleToUpdate.ID())
+	if err := s.eventService.PublishEvent(RoleEventTopic, pubEvent); err != nil {
+		if retryErr := s.eventService.RetryPublish(RoleEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return nil
diff --git a/controller/topology/nodes/nodeService.go b/controller/topology/nodes/nodeService.go
index 76e2de209..9de0bc3b3 100644
--- a/controller/topology/nodes/nodeService.go
+++ b/controller/topology/nodes/nodeService.go
@@ -60,8 +60,11 @@ func (n *NodeService) createNode(node Node) (Node, error) {
 		return node, err
 	}
 
-	if err := n.eventService.PublishEvent(NodeEventTopic, event.NewAddEvent(node.ID)); err != nil {
-		log.Error(err)
+	pubEvent := event.NewAddEvent(node.ID)
+	if err := n.eventService.PublishEvent(NodeEventTopic, pubEvent); err != nil {
+		if retryErr := n.eventService.RetryPublish(NodeEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return node, nil
@@ -74,8 +77,11 @@ func (n *NodeService) Update(node Node) error {
 		return err
 	}
 
-	if err := n.eventService.PublishEvent(NodeEventTopic, event.NewUpdateEvent(node.ID)); err != nil {
-		log.Error(err)
+	pubEvent := event.NewUpdateEvent(node.ID)
+	if err := n.eventService.PublishEvent(NodeEventTopic, pubEvent); err != nil {
+		if retryErr := n.eventService.RetryPublish(NodeEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return nil
@@ -88,8 +94,11 @@ func (n *NodeService) Delete(node Node) error {
 		return err
 	}
 
-	if err := n.eventService.PublishEvent(NodeEventTopic, event.NewDeleteEvent(node.ID)); err != nil {
-		log.Error(err)
+	pubEvent := event.NewDeleteEvent(node.ID)
+	if err := n.eventService.PublishEvent(NodeEventTopic, pubEvent); err != nil {
+		if retryErr := n.eventService.RetryPublish(NodeEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return nil
diff --git a/controller/topology/ports/portService.go b/controller/topology/ports/portService.go
index 1625155a0..1f8f99847 100644
--- a/controller/topology/ports/portService.go
+++ b/controller/topology/ports/portService.go
@@ -58,8 +58,11 @@ func (p *PortService) createPort(port Port) (Port, error) {
 		return port, err
 	}
 
-	if err := p.eventService.PublishEvent(PortEventTopic, event.NewAddEvent(port.ID)); err != nil {
-		log.Error(err)
+	pubEvent := event.NewAddEvent(port.ID)
+	if err := p.eventService.PublishEvent(PortEventTopic, pubEvent); err != nil {
+		if retryErr := p.eventService.RetryPublish(PortEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return port, nil
@@ -72,8 +75,11 @@ func (p *PortService) Update(port Port) error {
 		return err
 	}
 
-	if err := p.eventService.PublishEvent(PortEventTopic, event.NewUpdateEvent(port.ID)); err != nil {
-		log.Error(err)
+	pubEvent := event.NewUpdateEvent(port.ID)
+	if err := p.eventService.PublishEvent(PortEventTopic, pubEvent); err != nil {
+		if retryErr := p.eventService.RetryPublish(PortEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return nil
@@ -86,8 +92,11 @@ func (p *PortService) Delete(port Port) error {
 		return err
 	}
 
-	if err := p.eventService.PublishEvent(PortEventTopic, event.NewDeleteEvent(port.ID)); err != nil {
-		log.Error(err)
+	pubEvent := event.NewDeleteEvent(port.ID)
+	if err := p.eventService.PublishEvent(PortEventTopic, pubEvent); err != nil {
+		if retryErr := p.eventService.RetryPublish(PortEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return nil
diff --git a/controller/topology/routing-tables/routingTableService.go b/controller/topology/routing-tables/routingTableService.go
index eec4efb2f..43c13f1d7 100644
--- a/controller/topology/routing-tables/routingTableService.go
+++ b/controller/topology/routing-tables/routingTableService.go
@@ -68,8 +68,11 @@ func (r *RoutingTableService) createRoutingTable(routingTable RoutingTable) (Rou
 		return routingTable, err
 	}
 
-	if err := r.eventService.PublishEvent(RoutingTableEventTopic, event.NewAddEvent(routingTable.ID)); err != nil {
-		log.Error(err)
+	pubEvent := event.NewAddEvent(routingTable.ID)
+	if err := r.eventService.PublishEvent(RoutingTableEventTopic, pubEvent); err != nil {
+		if retryErr := r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return routingTable, nil
@@ -82,8 +85,11 @@ func (r *RoutingTableService) Update(routingTable RoutingTable) error {
 		return err
 	}
 
-	if err := r.eventService.PublishEvent(RoutingTableEventTopic, event.NewUpdateEvent(routingTable.ID)); err != nil {
-		log.Error(err)
+	pubEvent := event.NewUpdateEvent(routingTable.ID)
+	if err := r.eventService.PublishEvent(RoutingTableEventTopic, pubEvent); err != nil {
+		if retryErr := r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return nil
@@ -96,8 +102,11 @@ func (r *RoutingTableService) Delete(routingTable RoutingTable) error {
 		return err
 	}
 
-	if err := r.eventService.PublishEvent(RoutingTableEventTopic, event.NewDeleteEvent(routingTable.ID)); err != nil {
-		log.Error(err)
+	pubEvent := event.NewDeleteEvent(routingTable.ID)
+	if err := r.eventService.PublishEvent(RoutingTableEventTopic, pubEvent); err != nil {
+		if retryErr := r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return nil
diff --git a/controller/topology/topologyService.go b/controller/topology/topologyService.go
index 694910843..41546f085 100644
--- a/controller/topology/topologyService.go
+++ b/controller/topology/topologyService.go
@@ -74,8 +74,11 @@ func (t *service) AddLink(link links.Link) error {
 		return err
 	}
 
-	if err := t.eventService.PublishEvent(LinkEventTopic, event.NewAddEvent(link.ID)); err != nil {
-		log.Error(err)
+	pubEvent := event.NewAddEvent(link.ID)
+	if err := t.eventService.PublishEvent(LinkEventTopic, pubEvent); err != nil {
+		if retryErr := t.eventService.RetryPublish(LinkEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return nil
@@ -88,8 +91,11 @@ func (t *service) UpdateLink(link links.Link) error {
 		return err
 	}
 
-	if err := t.eventService.PublishEvent(LinkEventTopic, event.NewUpdateEvent(link.ID)); err != nil {
-		log.Error(err)
+	pubEvent := event.NewUpdateEvent(link.ID)
+	if err := t.eventService.PublishEvent(LinkEventTopic, pubEvent); err != nil {
+		if retryErr := t.eventService.RetryPublish(LinkEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return nil
@@ -104,8 +110,11 @@ func (t *service) DeleteLink(link links.Link) error {
 		return err
 	}
 
-	if err := t.eventService.PublishEvent(LinkEventTopic, event.NewDeleteEvent(link.ID)); err != nil {
-		log.Error(err)
+	pubEvent := event.NewDeleteEvent(link.ID)
+	if err := t.eventService.PublishEvent(LinkEventTopic, pubEvent); err != nil {
+		if retryErr := t.eventService.RetryPublish(LinkEventTopic, pubEvent); retryErr != nil {
+			log.Error(retryErr)
+		}
 	}
 
 	return nil
diff --git a/go.mod b/go.mod
index 0f802294d..968323405 100644
--- a/go.mod
+++ b/go.mod
@@ -77,6 +77,7 @@ require (
 	github.com/prometheus/procfs v0.8.0 // indirect
 	github.com/rabbitmq/amqp091-go v1.4.0
 	github.com/rivo/uniseg v0.2.0 // indirect
+	github.com/sethvargo/go-retry v0.2.3
 	github.com/spf13/afero v1.9.2 // indirect
 	github.com/spf13/cast v1.5.0 // indirect
 	github.com/spf13/jwalterweatherman v1.1.0 // indirect
diff --git a/go.sum b/go.sum
index fdb18fa57..0b4b959c6 100644
--- a/go.sum
+++ b/go.sum
@@ -33,12 +33,8 @@ cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1
 cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk=
 cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
 cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw=
-cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw=
-cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA=
 cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA=
 cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU=
-cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU=
-cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
 cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
 cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos=
 cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
@@ -866,6 +862,8 @@ github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
 github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
 github.com/sethvargo/go-password v0.2.0 h1:BTDl4CC/gjf/axHMaDQtw507ogrXLci6XRiLc7i/UHI=
 github.com/sethvargo/go-password v0.2.0/go.mod h1:Ym4Mr9JXLBycr02MFuVQ/0JHidNetSgbzutTr3zsYXE=
+github.com/sethvargo/go-retry v0.2.3 h1:oYlgvIvsju3jNbottWABtbnoLC+GDtLdBHxKWxQm/iU=
+github.com/sethvargo/go-retry v0.2.3/go.mod h1:1afjQuvh7s4gflMObvjLPaWgluLLyhA1wmVZ6KLpICw=
 github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
 github.com/sirupsen/logrus v1.0.4-0.20170822132746-89742aefa4b2/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
 github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
-- 
GitLab