diff --git a/controller/eventService/Service.go b/controller/eventService/Service.go index 8710655152b4db340ebfbebc2dd51d5d032370e3..e891e4ad95ae677fbddccd156cbb2107d763f00e 100644 --- a/controller/eventService/Service.go +++ b/controller/eventService/Service.go @@ -3,6 +3,7 @@ package eventservice import ( "context" "encoding/json" + "time" "code.fbi.h-da.de/danet/gosdn/controller/config" "code.fbi.h-da.de/danet/gosdn/controller/customerrs" @@ -11,6 +12,7 @@ import ( interfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event" amqp "github.com/rabbitmq/amqp091-go" + "github.com/sethvargo/go-retry" log "github.com/sirupsen/logrus" ) @@ -20,6 +22,10 @@ type EventService struct { channel *amqp.Channel } +const ( + rePublishAttempts = 5 +) + // NewEventService creates a new connection to the broker and opens a channel for later usage. func NewEventService() (interfaces.Service, error) { // TODO: This is an fugly hack to mitigate that some tests are trying to connect to an actual broker. (staester) @@ -79,6 +85,22 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error { return nil } +func (e *EventService) RetryPublish(topic string, event event.Event) error { + ctx := context.Background() + backOff := retry.NewConstant(2 * time.Second) + + if err := retry.Do(ctx, retry.WithMaxRetries(rePublishAttempts, backOff), func(ctx context.Context) error { + if pubErr := e.PublishEvent(topic, event); pubErr != nil { + return retry.RetryableError(&customerrs.AMQPMessageFailError{Action: "retrying to publish message failed", Err: pubErr}) + } + return nil + }); err != nil { + return err + } + + return nil +} + // CloseConnection closes an exisiting connection. func (e *EventService) CloseConnection() { if err := e.connection.Close(); err != nil { diff --git a/controller/eventService/utils.go b/controller/eventService/utils.go index 891142fef2e7bf809daca4885fb58921d4e4f9e3..400018196d0dd406c56b25fad039118ceef1b110 100644 --- a/controller/eventService/utils.go +++ b/controller/eventService/utils.go @@ -33,3 +33,10 @@ func (e *MockEventService) PublishEvent(topic string, event event.Event) error { // CloseConnection closes an exisiting connection. func (e *MockEventService) CloseConnection() { } + +// RetryPublish declares a queue and publishes events. +func (e *MockEventService) RetryPublish(topic string, event event.Event) error { + e.Queue[topic] = append(e.Queue[topic], event) + + return nil +} diff --git a/controller/interfaces/event/service.go b/controller/interfaces/event/service.go index 73f3c97d2dc3a30e45804671fc6bbc827017d0e5..41ec92361ddaaddd87d486cd1f936604ebff1f0e 100644 --- a/controller/interfaces/event/service.go +++ b/controller/interfaces/event/service.go @@ -6,4 +6,5 @@ import "code.fbi.h-da.de/danet/gosdn/controller/event" type Service interface { PublishEvent(topic string, event event.Event) error CloseConnection() + RetryPublish(topic string, event event.Event) error } diff --git a/controller/mocks/Service.go b/controller/mocks/Service.go index 188cd0c9c1e94bec7b2a665f26387dbd0085a7fa..de81b8be67e30949057a4e743a0e243f080b355b 100644 --- a/controller/mocks/Service.go +++ b/controller/mocks/Service.go @@ -3,10 +3,9 @@ package mocks import ( - device "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device" - mock "github.com/stretchr/testify/mock" + controllerevent "code.fbi.h-da.de/danet/gosdn/controller/event" - store "code.fbi.h-da.de/danet/gosdn/controller/store" + mock "github.com/stretchr/testify/mock" ) // Service is an autogenerated mock type for the Service type @@ -14,87 +13,18 @@ type Service struct { mock.Mock } -// Add provides a mock function with given fields: _a0 -func (_m *Service) Add(_a0 device.Device) error { - ret := _m.Called(_a0) - - var r0 error - if rf, ok := ret.Get(0).(func(device.Device) error); ok { - r0 = rf(_a0) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// Delete provides a mock function with given fields: _a0 -func (_m *Service) Delete(_a0 device.Device) error { - ret := _m.Called(_a0) - - var r0 error - if rf, ok := ret.Get(0).(func(device.Device) error); ok { - r0 = rf(_a0) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// Get provides a mock function with given fields: _a0 -func (_m *Service) Get(_a0 store.Query) (device.Device, error) { - ret := _m.Called(_a0) - - var r0 device.Device - if rf, ok := ret.Get(0).(func(store.Query) device.Device); ok { - r0 = rf(_a0) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(device.Device) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(store.Query) error); ok { - r1 = rf(_a0) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// GetAll provides a mock function with given fields: -func (_m *Service) GetAll() ([]device.Device, error) { - ret := _m.Called() - - var r0 []device.Device - if rf, ok := ret.Get(0).(func() []device.Device); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]device.Device) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func() error); ok { - r1 = rf() - } else { - r1 = ret.Error(1) - } - - return r0, r1 +// CloseConnection provides a mock function with given fields: +func (_m *Service) CloseConnection() { + _m.Called() } -// Update provides a mock function with given fields: _a0 -func (_m *Service) Update(_a0 device.Device) error { - ret := _m.Called(_a0) +// PublishEvent provides a mock function with given fields: topic, _a1 +func (_m *Service) PublishEvent(topic string, _a1 controllerevent.Event) error { + ret := _m.Called(topic, _a1) var r0 error - if rf, ok := ret.Get(0).(func(device.Device) error); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(string, controllerevent.Event) error); ok { + r0 = rf(topic, _a1) } else { r0 = ret.Error(0) } @@ -102,13 +32,13 @@ func (_m *Service) Update(_a0 device.Device) error { return r0 } -// UpdateModel provides a mock function with given fields: _a0, _a1 -func (_m *Service) UpdateModel(_a0 device.Device, _a1 string) error { - ret := _m.Called(_a0, _a1) +// RetryPublish provides a mock function with given fields: topic, _a1 +func (_m *Service) RetryPublish(topic string, _a1 controllerevent.Event) error { + ret := _m.Called(topic, _a1) var r0 error - if rf, ok := ret.Get(0).(func(device.Device, string) error); ok { - r0 = rf(_a0, _a1) + if rf, ok := ret.Get(0).(func(string, controllerevent.Event) error); ok { + r0 = rf(topic, _a1) } else { r0 = ret.Error(0) } diff --git a/controller/mocks/Store.go b/controller/mocks/Store.go index f8f71cd7da2b2719058248e965e8a591a1d3f764..6ce0fd98513a962eeea46f362bd24474bcdc99db 100644 --- a/controller/mocks/Store.go +++ b/controller/mocks/Store.go @@ -3,8 +3,8 @@ package mocks import ( - southbound "code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound" - store "code.fbi.h-da.de/danet/gosdn/controller/store" + store "code.fbi.h-da.de/danet/gosdn/controller/interfaces/store" + uuid "github.com/google/uuid" mock "github.com/stretchr/testify/mock" ) @@ -13,13 +13,13 @@ type Store struct { mock.Mock } -// Add provides a mock function with given fields: _a0 -func (_m *Store) Add(_a0 southbound.SouthboundInterface) error { - ret := _m.Called(_a0) +// Add provides a mock function with given fields: item +func (_m *Store) Add(item store.Storable) error { + ret := _m.Called(item) var r0 error - if rf, ok := ret.Get(0).(func(southbound.SouthboundInterface) error); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(store.Storable) error); ok { + r0 = rf(item) } else { r0 = ret.Error(0) } @@ -27,13 +27,13 @@ func (_m *Store) Add(_a0 southbound.SouthboundInterface) error { return r0 } -// Delete provides a mock function with given fields: _a0 -func (_m *Store) Delete(_a0 southbound.SouthboundInterface) error { - ret := _m.Called(_a0) +// Delete provides a mock function with given fields: id +func (_m *Store) Delete(id uuid.UUID) error { + ret := _m.Called(id) var r0 error - if rf, ok := ret.Get(0).(func(southbound.SouthboundInterface) error); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(uuid.UUID) error); ok { + r0 = rf(id) } else { r0 = ret.Error(0) } @@ -41,20 +41,36 @@ func (_m *Store) Delete(_a0 southbound.SouthboundInterface) error { return r0 } -// Get provides a mock function with given fields: _a0 -func (_m *Store) Get(_a0 store.Query) (southbound.LoadedSbi, error) { - ret := _m.Called(_a0) +// Exists provides a mock function with given fields: id +func (_m *Store) Exists(id uuid.UUID) bool { + ret := _m.Called(id) - var r0 southbound.LoadedSbi - if rf, ok := ret.Get(0).(func(store.Query) southbound.LoadedSbi); ok { - r0 = rf(_a0) + var r0 bool + if rf, ok := ret.Get(0).(func(uuid.UUID) bool); ok { + r0 = rf(id) } else { - r0 = ret.Get(0).(southbound.LoadedSbi) + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// Get provides a mock function with given fields: id +func (_m *Store) Get(id uuid.UUID) (store.Storable, error) { + ret := _m.Called(id) + + var r0 store.Storable + if rf, ok := ret.Get(0).(func(uuid.UUID) store.Storable); ok { + r0 = rf(id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(store.Storable) + } } var r1 error - if rf, ok := ret.Get(1).(func(store.Query) error); ok { - r1 = rf(_a0) + if rf, ok := ret.Get(1).(func(uuid.UUID) error); ok { + r1 = rf(id) } else { r1 = ret.Error(1) } @@ -62,27 +78,20 @@ func (_m *Store) Get(_a0 store.Query) (southbound.LoadedSbi, error) { return r0, r1 } -// GetAll provides a mock function with given fields: -func (_m *Store) GetAll() ([]southbound.LoadedSbi, error) { +// UUIDs provides a mock function with given fields: +func (_m *Store) UUIDs() []uuid.UUID { ret := _m.Called() - var r0 []southbound.LoadedSbi - if rf, ok := ret.Get(0).(func() []southbound.LoadedSbi); ok { + var r0 []uuid.UUID + if rf, ok := ret.Get(0).(func() []uuid.UUID); ok { r0 = rf() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]southbound.LoadedSbi) + r0 = ret.Get(0).([]uuid.UUID) } } - var r1 error - if rf, ok := ret.Get(1).(func() error); ok { - r1 = rf() - } else { - r1 = ret.Error(1) - } - - return r0, r1 + return r0 } type mockConstructorTestingTNewStore interface { diff --git a/controller/nucleus/deviceService.go b/controller/nucleus/deviceService.go index 7e2e00c8a3fde190d87bfba43e697d53d0d8b5b2..c96496f3940f5eaff25180d552166ee112b54721 100644 --- a/controller/nucleus/deviceService.go +++ b/controller/nucleus/deviceService.go @@ -80,8 +80,11 @@ func (s *DeviceService) Add(deviceToAdd device.Device) error { return err } - if err := s.eventService.PublishEvent(DeviceEventTopic, event.NewAddEvent(deviceToAdd.ID())); err != nil { - log.Error(err) + pubEvent := event.NewAddEvent(deviceToAdd.ID()) + if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil { + if retryErr := s.eventService.RetryPublish(DeviceEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return nil @@ -111,8 +114,11 @@ func (s *DeviceService) UpdateModel(deviceToUpdate device.Device, modelAsString return err } - if err := s.eventService.PublishEvent(DeviceEventTopic, event.NewUpdateEvent(deviceToUpdate.ID())); err != nil { - log.Error(err) + pubEvent := event.NewUpdateEvent(deviceToUpdate.ID()) + if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil { + if retryErr := s.eventService.RetryPublish(DeviceEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return nil @@ -125,8 +131,11 @@ func (s *DeviceService) Update(deviceToUpdate device.Device) error { return err } - if err := s.eventService.PublishEvent(DeviceEventTopic, event.NewUpdateEvent(deviceToUpdate.ID())); err != nil { - log.Error(err) + pubEvent := event.NewUpdateEvent(deviceToUpdate.ID()) + if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil { + if retryErr := s.eventService.RetryPublish(DeviceEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return nil @@ -145,8 +154,11 @@ func (s *DeviceService) Delete(deviceToDelete device.Device) error { } } - if err := s.eventService.PublishEvent(DeviceEventTopic, event.NewDeleteEvent(deviceToDelete.ID())); err != nil { - log.Error(err) + pubEvent := event.NewDeleteEvent(deviceToDelete.ID()) + if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil { + if retryErr := s.eventService.RetryPublish(DeviceEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return nil diff --git a/controller/nucleus/sbiService.go b/controller/nucleus/sbiService.go index 8c1902431e900c177b99baaed58cabba778e59dc..50d4f4e77ac55d6bc8525fac6f022a807f680b92 100644 --- a/controller/nucleus/sbiService.go +++ b/controller/nucleus/sbiService.go @@ -73,8 +73,11 @@ func (s *SbiService) Add(sbiToAdd southbound.SouthboundInterface) error { return err } - if err := s.eventService.PublishEvent(SbiEventTopic, event.NewAddEvent(sbiToAdd.ID())); err != nil { - log.Error(err) + pubEvent := event.NewAddEvent(sbiToAdd.ID()) + if err := s.eventService.PublishEvent(SbiEventTopic, pubEvent); err != nil { + if retryErr := s.eventService.RetryPublish(SbiEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return nil @@ -94,8 +97,11 @@ func (s *SbiService) Delete(sbiToDelete southbound.SouthboundInterface) error { } } - if err := s.eventService.PublishEvent(SbiEventTopic, event.NewDeleteEvent(sbiToDelete.ID())); err != nil { - log.Error(err) + pubEvent := event.NewDeleteEvent(sbiToDelete.ID()) + if err := s.eventService.PublishEvent(SbiEventTopic, pubEvent); err != nil { + if retryErr := s.eventService.RetryPublish(SbiEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return nil diff --git a/controller/rbac/rbacService.go b/controller/rbac/rbacService.go index a2b4a288ca6cabd9f72d0989575b9cf8aba9a54b..37499c35b60cea21bbfb6c5b122ed95311c1f900 100644 --- a/controller/rbac/rbacService.go +++ b/controller/rbac/rbacService.go @@ -40,8 +40,11 @@ func (s *UserService) Add(userToAdd rbac.User) error { return err } - if err := s.eventService.PublishEvent(UserEventTopic, event.NewAddEvent(userToAdd.ID())); err != nil { - log.Error(err) + pubEvent := event.NewAddEvent(userToAdd.ID()) + if err := s.eventService.PublishEvent(UserEventTopic, pubEvent); err != nil { + if retryErr := s.eventService.RetryPublish(UserEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return nil @@ -54,8 +57,11 @@ func (s *UserService) Delete(userToDelete rbac.User) error { return err } - if err := s.eventService.PublishEvent(UserEventTopic, event.NewDeleteEvent(userToDelete.ID())); err != nil { - log.Error(err) + pubEvent := event.NewDeleteEvent(userToDelete.ID()) + if err := s.eventService.PublishEvent(UserEventTopic, pubEvent); err != nil { + if retryErr := s.eventService.RetryPublish(UserEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return nil @@ -68,8 +74,11 @@ func (s *UserService) Update(userToUpdate rbac.User) error { return err } - if err := s.eventService.PublishEvent(UserEventTopic, event.NewUpdateEvent(userToUpdate.ID())); err != nil { - log.Error(err) + pubEvent := event.NewUpdateEvent(userToUpdate.ID()) + if err := s.eventService.PublishEvent(UserEventTopic, pubEvent); err != nil { + if retryErr := s.eventService.RetryPublish(UserEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return nil @@ -126,8 +135,11 @@ func (s *RoleService) Add(roleToAdd rbac.Role) error { return err } - if err := s.eventService.PublishEvent(RoleEventTopic, event.NewAddEvent(roleToAdd.ID())); err != nil { - log.Error(err) + pubEvent := event.NewAddEvent(roleToAdd.ID()) + if err := s.eventService.PublishEvent(RoleEventTopic, pubEvent); err != nil { + if retryErr := s.eventService.RetryPublish(RoleEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return nil @@ -140,10 +152,12 @@ func (s *RoleService) Delete(roleToDelete rbac.Role) error { return err } - if err := s.eventService.PublishEvent(RoleEventTopic, event.NewDeleteEvent(roleToDelete.ID())); err != nil { - log.Error(err) + pubEvent := event.NewDeleteEvent(roleToDelete.ID()) + if err := s.eventService.PublishEvent(RoleEventTopic, pubEvent); err != nil { + if retryErr := s.eventService.RetryPublish(RoleEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } - return nil } @@ -154,8 +168,11 @@ func (s *RoleService) Update(roleToUpdate rbac.Role) error { return err } - if err := s.eventService.PublishEvent(RoleEventTopic, event.NewUpdateEvent(roleToUpdate.ID())); err != nil { - log.Error(err) + pubEvent := event.NewUpdateEvent(roleToUpdate.ID()) + if err := s.eventService.PublishEvent(RoleEventTopic, pubEvent); err != nil { + if retryErr := s.eventService.RetryPublish(RoleEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return nil diff --git a/controller/topology/nodes/nodeService.go b/controller/topology/nodes/nodeService.go index 76e2de209bc13eafecaf5989178433a405ea531b..9de0bc3b3d843dc4e4f2c2457fdff261b2f6506d 100644 --- a/controller/topology/nodes/nodeService.go +++ b/controller/topology/nodes/nodeService.go @@ -60,8 +60,11 @@ func (n *NodeService) createNode(node Node) (Node, error) { return node, err } - if err := n.eventService.PublishEvent(NodeEventTopic, event.NewAddEvent(node.ID)); err != nil { - log.Error(err) + pubEvent := event.NewAddEvent(node.ID) + if err := n.eventService.PublishEvent(NodeEventTopic, pubEvent); err != nil { + if retryErr := n.eventService.RetryPublish(NodeEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return node, nil @@ -74,8 +77,11 @@ func (n *NodeService) Update(node Node) error { return err } - if err := n.eventService.PublishEvent(NodeEventTopic, event.NewUpdateEvent(node.ID)); err != nil { - log.Error(err) + pubEvent := event.NewUpdateEvent(node.ID) + if err := n.eventService.PublishEvent(NodeEventTopic, pubEvent); err != nil { + if retryErr := n.eventService.RetryPublish(NodeEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return nil @@ -88,8 +94,11 @@ func (n *NodeService) Delete(node Node) error { return err } - if err := n.eventService.PublishEvent(NodeEventTopic, event.NewDeleteEvent(node.ID)); err != nil { - log.Error(err) + pubEvent := event.NewDeleteEvent(node.ID) + if err := n.eventService.PublishEvent(NodeEventTopic, pubEvent); err != nil { + if retryErr := n.eventService.RetryPublish(NodeEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return nil diff --git a/controller/topology/ports/portService.go b/controller/topology/ports/portService.go index 1625155a0cb1f4578be24b70186fd36622b17290..1f8f99847a3ac0262d35c72a4f4cc9ccb51edc72 100644 --- a/controller/topology/ports/portService.go +++ b/controller/topology/ports/portService.go @@ -58,8 +58,11 @@ func (p *PortService) createPort(port Port) (Port, error) { return port, err } - if err := p.eventService.PublishEvent(PortEventTopic, event.NewAddEvent(port.ID)); err != nil { - log.Error(err) + pubEvent := event.NewAddEvent(port.ID) + if err := p.eventService.PublishEvent(PortEventTopic, pubEvent); err != nil { + if retryErr := p.eventService.RetryPublish(PortEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return port, nil @@ -72,8 +75,11 @@ func (p *PortService) Update(port Port) error { return err } - if err := p.eventService.PublishEvent(PortEventTopic, event.NewUpdateEvent(port.ID)); err != nil { - log.Error(err) + pubEvent := event.NewUpdateEvent(port.ID) + if err := p.eventService.PublishEvent(PortEventTopic, pubEvent); err != nil { + if retryErr := p.eventService.RetryPublish(PortEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return nil @@ -86,8 +92,11 @@ func (p *PortService) Delete(port Port) error { return err } - if err := p.eventService.PublishEvent(PortEventTopic, event.NewDeleteEvent(port.ID)); err != nil { - log.Error(err) + pubEvent := event.NewDeleteEvent(port.ID) + if err := p.eventService.PublishEvent(PortEventTopic, pubEvent); err != nil { + if retryErr := p.eventService.RetryPublish(PortEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return nil diff --git a/controller/topology/routing-tables/routingTableService.go b/controller/topology/routing-tables/routingTableService.go index eec4efb2fc4f7cfd9fd0f2c7fa01a5a2bff47955..43c13f1d718f254ab331cdeff9a7560d0e1620e5 100644 --- a/controller/topology/routing-tables/routingTableService.go +++ b/controller/topology/routing-tables/routingTableService.go @@ -68,8 +68,11 @@ func (r *RoutingTableService) createRoutingTable(routingTable RoutingTable) (Rou return routingTable, err } - if err := r.eventService.PublishEvent(RoutingTableEventTopic, event.NewAddEvent(routingTable.ID)); err != nil { - log.Error(err) + pubEvent := event.NewAddEvent(routingTable.ID) + if err := r.eventService.PublishEvent(RoutingTableEventTopic, pubEvent); err != nil { + if retryErr := r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return routingTable, nil @@ -82,8 +85,11 @@ func (r *RoutingTableService) Update(routingTable RoutingTable) error { return err } - if err := r.eventService.PublishEvent(RoutingTableEventTopic, event.NewUpdateEvent(routingTable.ID)); err != nil { - log.Error(err) + pubEvent := event.NewUpdateEvent(routingTable.ID) + if err := r.eventService.PublishEvent(RoutingTableEventTopic, pubEvent); err != nil { + if retryErr := r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return nil @@ -96,8 +102,11 @@ func (r *RoutingTableService) Delete(routingTable RoutingTable) error { return err } - if err := r.eventService.PublishEvent(RoutingTableEventTopic, event.NewDeleteEvent(routingTable.ID)); err != nil { - log.Error(err) + pubEvent := event.NewDeleteEvent(routingTable.ID) + if err := r.eventService.PublishEvent(RoutingTableEventTopic, pubEvent); err != nil { + if retryErr := r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return nil diff --git a/controller/topology/topologyService.go b/controller/topology/topologyService.go index 6949108435b88caf15e0b7b6e65bc9c8453018a8..41546f085e33667b43996df946e0f5cfb31fd08b 100644 --- a/controller/topology/topologyService.go +++ b/controller/topology/topologyService.go @@ -74,8 +74,11 @@ func (t *service) AddLink(link links.Link) error { return err } - if err := t.eventService.PublishEvent(LinkEventTopic, event.NewAddEvent(link.ID)); err != nil { - log.Error(err) + pubEvent := event.NewAddEvent(link.ID) + if err := t.eventService.PublishEvent(LinkEventTopic, pubEvent); err != nil { + if retryErr := t.eventService.RetryPublish(LinkEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return nil @@ -88,8 +91,11 @@ func (t *service) UpdateLink(link links.Link) error { return err } - if err := t.eventService.PublishEvent(LinkEventTopic, event.NewUpdateEvent(link.ID)); err != nil { - log.Error(err) + pubEvent := event.NewUpdateEvent(link.ID) + if err := t.eventService.PublishEvent(LinkEventTopic, pubEvent); err != nil { + if retryErr := t.eventService.RetryPublish(LinkEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return nil @@ -104,8 +110,11 @@ func (t *service) DeleteLink(link links.Link) error { return err } - if err := t.eventService.PublishEvent(LinkEventTopic, event.NewDeleteEvent(link.ID)); err != nil { - log.Error(err) + pubEvent := event.NewDeleteEvent(link.ID) + if err := t.eventService.PublishEvent(LinkEventTopic, pubEvent); err != nil { + if retryErr := t.eventService.RetryPublish(LinkEventTopic, pubEvent); retryErr != nil { + log.Error(retryErr) + } } return nil diff --git a/go.mod b/go.mod index 0f802294d2f2fa1d4b87b810999c2e69c3c362d6..96832340527fb2e276790b86347497b971aa9c11 100644 --- a/go.mod +++ b/go.mod @@ -77,6 +77,7 @@ require ( github.com/prometheus/procfs v0.8.0 // indirect github.com/rabbitmq/amqp091-go v1.4.0 github.com/rivo/uniseg v0.2.0 // indirect + github.com/sethvargo/go-retry v0.2.3 github.com/spf13/afero v1.9.2 // indirect github.com/spf13/cast v1.5.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect diff --git a/go.sum b/go.sum index fdb18fa57836f850772f2a336e645b0e1cc81069..0b4b959c6f001f7bc317e200ddabe2f4b67bc521 100644 --- a/go.sum +++ b/go.sum @@ -33,12 +33,8 @@ cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1 cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw= -cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw= -cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA= cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA= cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU= -cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU= -cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw= cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw= cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos= cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= @@ -866,6 +862,8 @@ github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ= github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/sethvargo/go-password v0.2.0 h1:BTDl4CC/gjf/axHMaDQtw507ogrXLci6XRiLc7i/UHI= github.com/sethvargo/go-password v0.2.0/go.mod h1:Ym4Mr9JXLBycr02MFuVQ/0JHidNetSgbzutTr3zsYXE= +github.com/sethvargo/go-retry v0.2.3 h1:oYlgvIvsju3jNbottWABtbnoLC+GDtLdBHxKWxQm/iU= +github.com/sethvargo/go-retry v0.2.3/go.mod h1:1afjQuvh7s4gflMObvjLPaWgluLLyhA1wmVZ6KLpICw= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.0.4-0.20170822132746-89742aefa4b2/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=