diff --git a/controller/eventService/Service.go b/controller/eventService/Service.go index 0ace2760fd5a9dfe890b21566adffcb179dd9d5d..b21df9c7507f8bfd5c2dc01b66310e371aecc592 100644 --- a/controller/eventService/Service.go +++ b/controller/eventService/Service.go @@ -23,7 +23,8 @@ type EventService struct { } const ( - rePublishAttempts = 5 + // Max number of tries to republish a failed message + rePublishAttempts = 10 ) // NewEventService creates a new connection to the broker and opens a channel for later usage. @@ -88,7 +89,7 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error { // RetryPublish is used to retry publishing an event after a failed attempt. func (e *EventService) RetryPublish(topic string, event event.Event) error { ctx := context.Background() - backOff := retry.NewConstant(2 * time.Second) + backOff := retry.NewFibonacci(time.Second) if err := retry.Do(ctx, retry.WithMaxRetries(rePublishAttempts, backOff), func(ctx context.Context) error { if pubErr := e.PublishEvent(topic, event); pubErr != nil { diff --git a/controller/nucleus/deviceService.go b/controller/nucleus/deviceService.go index e6aad402f96afa4efae770f8f13eaaed42e9b29d..2dacf748effc6e8c082caee7b9e8903a935c0cef 100644 --- a/controller/nucleus/deviceService.go +++ b/controller/nucleus/deviceService.go @@ -82,14 +82,12 @@ func (s *DeviceService) Add(deviceToAdd device.Device) error { pubEvent := event.NewAddEvent(deviceToAdd.ID()) if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = s.eventService.RetryPublish(DeviceEventTopic, pubEvent) + retryErr := s.eventService.RetryPublish(DeviceEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return nil @@ -121,14 +119,12 @@ func (s *DeviceService) UpdateModel(deviceToUpdate device.Device, modelAsString pubEvent := event.NewUpdateEvent(deviceToUpdate.ID()) if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = s.eventService.RetryPublish(DeviceEventTopic, pubEvent) + retryErr := s.eventService.RetryPublish(DeviceEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return nil @@ -143,14 +139,12 @@ func (s *DeviceService) Update(deviceToUpdate device.Device) error { pubEvent := event.NewUpdateEvent(deviceToUpdate.ID()) if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = s.eventService.RetryPublish(DeviceEventTopic, pubEvent) + retryErr := s.eventService.RetryPublish(DeviceEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return nil @@ -171,14 +165,12 @@ func (s *DeviceService) Delete(deviceToDelete device.Device) error { pubEvent := event.NewDeleteEvent(deviceToDelete.ID()) if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = s.eventService.RetryPublish(DeviceEventTopic, pubEvent) + retryErr := s.eventService.RetryPublish(DeviceEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return nil diff --git a/controller/nucleus/sbiService.go b/controller/nucleus/sbiService.go index 8a2442c0b1f9026583e1c1947089d260921f12e3..6d3f6259c0b45e2fe5b8e41ab49b4b292c44cfb7 100644 --- a/controller/nucleus/sbiService.go +++ b/controller/nucleus/sbiService.go @@ -75,14 +75,12 @@ func (s *SbiService) Add(sbiToAdd southbound.SouthboundInterface) error { pubEvent := event.NewAddEvent(sbiToAdd.ID()) if err := s.eventService.PublishEvent(SbiEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = s.eventService.RetryPublish(SbiEventTopic, pubEvent) + retryErr := s.eventService.RetryPublish(SbiEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return nil @@ -104,14 +102,12 @@ func (s *SbiService) Delete(sbiToDelete southbound.SouthboundInterface) error { pubEvent := event.NewDeleteEvent(sbiToDelete.ID()) if err := s.eventService.PublishEvent(SbiEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = s.eventService.RetryPublish(SbiEventTopic, pubEvent) + retryErr := s.eventService.RetryPublish(SbiEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return nil diff --git a/controller/rbac/rbacService.go b/controller/rbac/rbacService.go index 19c5d25577cba5f406e2c3565b5751735bcedae8..f999674dae60f6396a134091d6bdca525fab453d 100644 --- a/controller/rbac/rbacService.go +++ b/controller/rbac/rbacService.go @@ -42,14 +42,12 @@ func (s *UserService) Add(userToAdd rbac.User) error { pubEvent := event.NewAddEvent(userToAdd.ID()) if err := s.eventService.PublishEvent(UserEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = s.eventService.RetryPublish(UserEventTopic, pubEvent) + retryErr := s.eventService.RetryPublish(UserEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return nil @@ -64,14 +62,12 @@ func (s *UserService) Delete(userToDelete rbac.User) error { pubEvent := event.NewDeleteEvent(userToDelete.ID()) if err := s.eventService.PublishEvent(UserEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = s.eventService.RetryPublish(UserEventTopic, pubEvent) + retryErr := s.eventService.RetryPublish(UserEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return nil @@ -86,14 +82,12 @@ func (s *UserService) Update(userToUpdate rbac.User) error { pubEvent := event.NewUpdateEvent(userToUpdate.ID()) if err := s.eventService.PublishEvent(UserEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = s.eventService.RetryPublish(UserEventTopic, pubEvent) + retryErr := s.eventService.RetryPublish(UserEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return nil @@ -152,14 +146,12 @@ func (s *RoleService) Add(roleToAdd rbac.Role) error { pubEvent := event.NewAddEvent(roleToAdd.ID()) if err := s.eventService.PublishEvent(RoleEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = s.eventService.RetryPublish(RoleEventTopic, pubEvent) + retryErr := s.eventService.RetryPublish(RoleEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return nil @@ -174,14 +166,12 @@ func (s *RoleService) Delete(roleToDelete rbac.Role) error { pubEvent := event.NewDeleteEvent(roleToDelete.ID()) if err := s.eventService.PublishEvent(RoleEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = s.eventService.RetryPublish(RoleEventTopic, pubEvent) + retryErr := s.eventService.RetryPublish(RoleEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return nil } @@ -195,14 +185,12 @@ func (s *RoleService) Update(roleToUpdate rbac.Role) error { pubEvent := event.NewUpdateEvent(roleToUpdate.ID()) if err := s.eventService.PublishEvent(RoleEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = s.eventService.RetryPublish(RoleEventTopic, pubEvent) + retryErr := s.eventService.RetryPublish(RoleEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return nil diff --git a/controller/topology/nodes/nodeService.go b/controller/topology/nodes/nodeService.go index c77e804213bb80b60f2e23a26ad09a42c7c24e76..b72dbbc5c8f0d7f455fbc10cff0f9822499ed473 100644 --- a/controller/topology/nodes/nodeService.go +++ b/controller/topology/nodes/nodeService.go @@ -62,14 +62,12 @@ func (n *NodeService) createNode(node Node) (Node, error) { pubEvent := event.NewAddEvent(node.ID) if err := n.eventService.PublishEvent(NodeEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = n.eventService.RetryPublish(NodeEventTopic, pubEvent) + retryErr := n.eventService.RetryPublish(NodeEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return node, nil @@ -84,14 +82,12 @@ func (n *NodeService) Update(node Node) error { pubEvent := event.NewUpdateEvent(node.ID) if err := n.eventService.PublishEvent(NodeEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = n.eventService.RetryPublish(NodeEventTopic, pubEvent) + retryErr := n.eventService.RetryPublish(NodeEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return nil @@ -106,14 +102,12 @@ func (n *NodeService) Delete(node Node) error { pubEvent := event.NewDeleteEvent(node.ID) if err := n.eventService.PublishEvent(NodeEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = n.eventService.RetryPublish(NodeEventTopic, pubEvent) + retryErr := n.eventService.RetryPublish(NodeEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return nil diff --git a/controller/topology/ports/portService.go b/controller/topology/ports/portService.go index 971a98eb8f01e89acc0c2e58f3b4e9b278677b81..28ad38a80b91baefafe4b4458aec32b21e3cadd4 100644 --- a/controller/topology/ports/portService.go +++ b/controller/topology/ports/portService.go @@ -60,14 +60,12 @@ func (p *PortService) createPort(port Port) (Port, error) { pubEvent := event.NewAddEvent(port.ID) if err := p.eventService.PublishEvent(PortEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = p.eventService.RetryPublish(PortEventTopic, pubEvent) + retryErr := p.eventService.RetryPublish(PortEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return port, nil @@ -82,14 +80,12 @@ func (p *PortService) Update(port Port) error { pubEvent := event.NewUpdateEvent(port.ID) if err := p.eventService.PublishEvent(PortEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = p.eventService.RetryPublish(PortEventTopic, pubEvent) + retryErr := p.eventService.RetryPublish(PortEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return nil @@ -104,14 +100,12 @@ func (p *PortService) Delete(port Port) error { pubEvent := event.NewDeleteEvent(port.ID) if err := p.eventService.PublishEvent(PortEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = p.eventService.RetryPublish(PortEventTopic, pubEvent) + retryErr := p.eventService.RetryPublish(PortEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return nil diff --git a/controller/topology/routing-tables/routingTableService.go b/controller/topology/routing-tables/routingTableService.go index eefebff39105307d5f33a7c498ec5fa7119aa1f5..0e2cba26d2e8968576f3e5326dc0e87986dfbf89 100644 --- a/controller/topology/routing-tables/routingTableService.go +++ b/controller/topology/routing-tables/routingTableService.go @@ -70,14 +70,12 @@ func (r *RoutingTableService) createRoutingTable(routingTable RoutingTable) (Rou pubEvent := event.NewAddEvent(routingTable.ID) if err := r.eventService.PublishEvent(RoutingTableEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent) + retryErr := r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return routingTable, nil @@ -92,14 +90,12 @@ func (r *RoutingTableService) Update(routingTable RoutingTable) error { pubEvent := event.NewUpdateEvent(routingTable.ID) if err := r.eventService.PublishEvent(RoutingTableEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent) + retryErr := r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return nil @@ -114,14 +110,12 @@ func (r *RoutingTableService) Delete(routingTable RoutingTable) error { pubEvent := event.NewDeleteEvent(routingTable.ID) if err := r.eventService.PublishEvent(RoutingTableEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent) + retryErr := r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return nil diff --git a/controller/topology/topologyService.go b/controller/topology/topologyService.go index 6bda2fd5c19a07322d91d4357dc3f3b5da7f1fa0..f0743fbf82b6aefae9a264a20c0d53e8b2410097 100644 --- a/controller/topology/topologyService.go +++ b/controller/topology/topologyService.go @@ -76,14 +76,12 @@ func (t *service) AddLink(link links.Link) error { pubEvent := event.NewAddEvent(link.ID) if err := t.eventService.PublishEvent(LinkEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = t.eventService.RetryPublish(LinkEventTopic, pubEvent) + retryErr := t.eventService.RetryPublish(LinkEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return nil @@ -98,14 +96,12 @@ func (t *service) UpdateLink(link links.Link) error { pubEvent := event.NewUpdateEvent(link.ID) if err := t.eventService.PublishEvent(LinkEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = t.eventService.RetryPublish(LinkEventTopic, pubEvent) + retryErr := t.eventService.RetryPublish(LinkEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return nil @@ -122,14 +118,12 @@ func (t *service) DeleteLink(link links.Link) error { pubEvent := event.NewDeleteEvent(link.ID) if err := t.eventService.PublishEvent(LinkEventTopic, pubEvent); err != nil { - var retryErr error go func() { - retryErr = t.eventService.RetryPublish(LinkEventTopic, pubEvent) + retryErr := t.eventService.RetryPublish(LinkEventTopic, pubEvent) + if retryErr != nil { + log.Error(retryErr) + } }() - - if retryErr != nil { - log.Error(retryErr) - } } return nil