Skip to content
Snippets Groups Projects
Commit 0a2203ec authored by Fabian Seidl's avatar Fabian Seidl
Browse files

added go routine to event publishing to prevent blocking

parent 6fd47d43
No related branches found
No related tags found
1 merge request!370Resolve "Error handling in event publishing via entity services could be improved"
Pipeline #111649 passed
...@@ -82,7 +82,12 @@ func (s *DeviceService) Add(deviceToAdd device.Device) error { ...@@ -82,7 +82,12 @@ func (s *DeviceService) Add(deviceToAdd device.Device) error {
pubEvent := event.NewAddEvent(deviceToAdd.ID()) pubEvent := event.NewAddEvent(deviceToAdd.ID())
if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil { if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil {
if retryErr := s.eventService.RetryPublish(DeviceEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = s.eventService.RetryPublish(DeviceEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
...@@ -116,7 +121,12 @@ func (s *DeviceService) UpdateModel(deviceToUpdate device.Device, modelAsString ...@@ -116,7 +121,12 @@ func (s *DeviceService) UpdateModel(deviceToUpdate device.Device, modelAsString
pubEvent := event.NewUpdateEvent(deviceToUpdate.ID()) pubEvent := event.NewUpdateEvent(deviceToUpdate.ID())
if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil { if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil {
if retryErr := s.eventService.RetryPublish(DeviceEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = s.eventService.RetryPublish(DeviceEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
...@@ -133,7 +143,12 @@ func (s *DeviceService) Update(deviceToUpdate device.Device) error { ...@@ -133,7 +143,12 @@ func (s *DeviceService) Update(deviceToUpdate device.Device) error {
pubEvent := event.NewUpdateEvent(deviceToUpdate.ID()) pubEvent := event.NewUpdateEvent(deviceToUpdate.ID())
if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil { if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil {
if retryErr := s.eventService.RetryPublish(DeviceEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = s.eventService.RetryPublish(DeviceEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
...@@ -156,7 +171,12 @@ func (s *DeviceService) Delete(deviceToDelete device.Device) error { ...@@ -156,7 +171,12 @@ func (s *DeviceService) Delete(deviceToDelete device.Device) error {
pubEvent := event.NewDeleteEvent(deviceToDelete.ID()) pubEvent := event.NewDeleteEvent(deviceToDelete.ID())
if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil { if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil {
if retryErr := s.eventService.RetryPublish(DeviceEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = s.eventService.RetryPublish(DeviceEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
......
...@@ -75,7 +75,12 @@ func (s *SbiService) Add(sbiToAdd southbound.SouthboundInterface) error { ...@@ -75,7 +75,12 @@ func (s *SbiService) Add(sbiToAdd southbound.SouthboundInterface) error {
pubEvent := event.NewAddEvent(sbiToAdd.ID()) pubEvent := event.NewAddEvent(sbiToAdd.ID())
if err := s.eventService.PublishEvent(SbiEventTopic, pubEvent); err != nil { if err := s.eventService.PublishEvent(SbiEventTopic, pubEvent); err != nil {
if retryErr := s.eventService.RetryPublish(SbiEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = s.eventService.RetryPublish(SbiEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
...@@ -99,7 +104,12 @@ func (s *SbiService) Delete(sbiToDelete southbound.SouthboundInterface) error { ...@@ -99,7 +104,12 @@ func (s *SbiService) Delete(sbiToDelete southbound.SouthboundInterface) error {
pubEvent := event.NewDeleteEvent(sbiToDelete.ID()) pubEvent := event.NewDeleteEvent(sbiToDelete.ID())
if err := s.eventService.PublishEvent(SbiEventTopic, pubEvent); err != nil { if err := s.eventService.PublishEvent(SbiEventTopic, pubEvent); err != nil {
if retryErr := s.eventService.RetryPublish(SbiEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = s.eventService.RetryPublish(SbiEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
......
...@@ -42,7 +42,12 @@ func (s *UserService) Add(userToAdd rbac.User) error { ...@@ -42,7 +42,12 @@ func (s *UserService) Add(userToAdd rbac.User) error {
pubEvent := event.NewAddEvent(userToAdd.ID()) pubEvent := event.NewAddEvent(userToAdd.ID())
if err := s.eventService.PublishEvent(UserEventTopic, pubEvent); err != nil { if err := s.eventService.PublishEvent(UserEventTopic, pubEvent); err != nil {
if retryErr := s.eventService.RetryPublish(UserEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = s.eventService.RetryPublish(UserEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
...@@ -59,7 +64,12 @@ func (s *UserService) Delete(userToDelete rbac.User) error { ...@@ -59,7 +64,12 @@ func (s *UserService) Delete(userToDelete rbac.User) error {
pubEvent := event.NewDeleteEvent(userToDelete.ID()) pubEvent := event.NewDeleteEvent(userToDelete.ID())
if err := s.eventService.PublishEvent(UserEventTopic, pubEvent); err != nil { if err := s.eventService.PublishEvent(UserEventTopic, pubEvent); err != nil {
if retryErr := s.eventService.RetryPublish(UserEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = s.eventService.RetryPublish(UserEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
...@@ -76,7 +86,12 @@ func (s *UserService) Update(userToUpdate rbac.User) error { ...@@ -76,7 +86,12 @@ func (s *UserService) Update(userToUpdate rbac.User) error {
pubEvent := event.NewUpdateEvent(userToUpdate.ID()) pubEvent := event.NewUpdateEvent(userToUpdate.ID())
if err := s.eventService.PublishEvent(UserEventTopic, pubEvent); err != nil { if err := s.eventService.PublishEvent(UserEventTopic, pubEvent); err != nil {
if retryErr := s.eventService.RetryPublish(UserEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = s.eventService.RetryPublish(UserEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
...@@ -137,7 +152,12 @@ func (s *RoleService) Add(roleToAdd rbac.Role) error { ...@@ -137,7 +152,12 @@ func (s *RoleService) Add(roleToAdd rbac.Role) error {
pubEvent := event.NewAddEvent(roleToAdd.ID()) pubEvent := event.NewAddEvent(roleToAdd.ID())
if err := s.eventService.PublishEvent(RoleEventTopic, pubEvent); err != nil { if err := s.eventService.PublishEvent(RoleEventTopic, pubEvent); err != nil {
if retryErr := s.eventService.RetryPublish(RoleEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = s.eventService.RetryPublish(RoleEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
...@@ -154,7 +174,12 @@ func (s *RoleService) Delete(roleToDelete rbac.Role) error { ...@@ -154,7 +174,12 @@ func (s *RoleService) Delete(roleToDelete rbac.Role) error {
pubEvent := event.NewDeleteEvent(roleToDelete.ID()) pubEvent := event.NewDeleteEvent(roleToDelete.ID())
if err := s.eventService.PublishEvent(RoleEventTopic, pubEvent); err != nil { if err := s.eventService.PublishEvent(RoleEventTopic, pubEvent); err != nil {
if retryErr := s.eventService.RetryPublish(RoleEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = s.eventService.RetryPublish(RoleEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
...@@ -170,7 +195,12 @@ func (s *RoleService) Update(roleToUpdate rbac.Role) error { ...@@ -170,7 +195,12 @@ func (s *RoleService) Update(roleToUpdate rbac.Role) error {
pubEvent := event.NewUpdateEvent(roleToUpdate.ID()) pubEvent := event.NewUpdateEvent(roleToUpdate.ID())
if err := s.eventService.PublishEvent(RoleEventTopic, pubEvent); err != nil { if err := s.eventService.PublishEvent(RoleEventTopic, pubEvent); err != nil {
if retryErr := s.eventService.RetryPublish(RoleEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = s.eventService.RetryPublish(RoleEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
......
...@@ -62,7 +62,12 @@ func (n *NodeService) createNode(node Node) (Node, error) { ...@@ -62,7 +62,12 @@ func (n *NodeService) createNode(node Node) (Node, error) {
pubEvent := event.NewAddEvent(node.ID) pubEvent := event.NewAddEvent(node.ID)
if err := n.eventService.PublishEvent(NodeEventTopic, pubEvent); err != nil { if err := n.eventService.PublishEvent(NodeEventTopic, pubEvent); err != nil {
if retryErr := n.eventService.RetryPublish(NodeEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = n.eventService.RetryPublish(NodeEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
...@@ -79,7 +84,12 @@ func (n *NodeService) Update(node Node) error { ...@@ -79,7 +84,12 @@ func (n *NodeService) Update(node Node) error {
pubEvent := event.NewUpdateEvent(node.ID) pubEvent := event.NewUpdateEvent(node.ID)
if err := n.eventService.PublishEvent(NodeEventTopic, pubEvent); err != nil { if err := n.eventService.PublishEvent(NodeEventTopic, pubEvent); err != nil {
if retryErr := n.eventService.RetryPublish(NodeEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = n.eventService.RetryPublish(NodeEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
...@@ -96,7 +106,12 @@ func (n *NodeService) Delete(node Node) error { ...@@ -96,7 +106,12 @@ func (n *NodeService) Delete(node Node) error {
pubEvent := event.NewDeleteEvent(node.ID) pubEvent := event.NewDeleteEvent(node.ID)
if err := n.eventService.PublishEvent(NodeEventTopic, pubEvent); err != nil { if err := n.eventService.PublishEvent(NodeEventTopic, pubEvent); err != nil {
if retryErr := n.eventService.RetryPublish(NodeEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = n.eventService.RetryPublish(NodeEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
......
...@@ -60,7 +60,12 @@ func (p *PortService) createPort(port Port) (Port, error) { ...@@ -60,7 +60,12 @@ func (p *PortService) createPort(port Port) (Port, error) {
pubEvent := event.NewAddEvent(port.ID) pubEvent := event.NewAddEvent(port.ID)
if err := p.eventService.PublishEvent(PortEventTopic, pubEvent); err != nil { if err := p.eventService.PublishEvent(PortEventTopic, pubEvent); err != nil {
if retryErr := p.eventService.RetryPublish(PortEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = p.eventService.RetryPublish(PortEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
...@@ -77,7 +82,12 @@ func (p *PortService) Update(port Port) error { ...@@ -77,7 +82,12 @@ func (p *PortService) Update(port Port) error {
pubEvent := event.NewUpdateEvent(port.ID) pubEvent := event.NewUpdateEvent(port.ID)
if err := p.eventService.PublishEvent(PortEventTopic, pubEvent); err != nil { if err := p.eventService.PublishEvent(PortEventTopic, pubEvent); err != nil {
if retryErr := p.eventService.RetryPublish(PortEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = p.eventService.RetryPublish(PortEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
...@@ -94,7 +104,12 @@ func (p *PortService) Delete(port Port) error { ...@@ -94,7 +104,12 @@ func (p *PortService) Delete(port Port) error {
pubEvent := event.NewDeleteEvent(port.ID) pubEvent := event.NewDeleteEvent(port.ID)
if err := p.eventService.PublishEvent(PortEventTopic, pubEvent); err != nil { if err := p.eventService.PublishEvent(PortEventTopic, pubEvent); err != nil {
if retryErr := p.eventService.RetryPublish(PortEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = p.eventService.RetryPublish(PortEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
......
...@@ -70,7 +70,12 @@ func (r *RoutingTableService) createRoutingTable(routingTable RoutingTable) (Rou ...@@ -70,7 +70,12 @@ func (r *RoutingTableService) createRoutingTable(routingTable RoutingTable) (Rou
pubEvent := event.NewAddEvent(routingTable.ID) pubEvent := event.NewAddEvent(routingTable.ID)
if err := r.eventService.PublishEvent(RoutingTableEventTopic, pubEvent); err != nil { if err := r.eventService.PublishEvent(RoutingTableEventTopic, pubEvent); err != nil {
if retryErr := r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
...@@ -87,7 +92,12 @@ func (r *RoutingTableService) Update(routingTable RoutingTable) error { ...@@ -87,7 +92,12 @@ func (r *RoutingTableService) Update(routingTable RoutingTable) error {
pubEvent := event.NewUpdateEvent(routingTable.ID) pubEvent := event.NewUpdateEvent(routingTable.ID)
if err := r.eventService.PublishEvent(RoutingTableEventTopic, pubEvent); err != nil { if err := r.eventService.PublishEvent(RoutingTableEventTopic, pubEvent); err != nil {
if retryErr := r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
...@@ -104,7 +114,12 @@ func (r *RoutingTableService) Delete(routingTable RoutingTable) error { ...@@ -104,7 +114,12 @@ func (r *RoutingTableService) Delete(routingTable RoutingTable) error {
pubEvent := event.NewDeleteEvent(routingTable.ID) pubEvent := event.NewDeleteEvent(routingTable.ID)
if err := r.eventService.PublishEvent(RoutingTableEventTopic, pubEvent); err != nil { if err := r.eventService.PublishEvent(RoutingTableEventTopic, pubEvent); err != nil {
if retryErr := r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
......
...@@ -76,7 +76,12 @@ func (t *service) AddLink(link links.Link) error { ...@@ -76,7 +76,12 @@ func (t *service) AddLink(link links.Link) error {
pubEvent := event.NewAddEvent(link.ID) pubEvent := event.NewAddEvent(link.ID)
if err := t.eventService.PublishEvent(LinkEventTopic, pubEvent); err != nil { if err := t.eventService.PublishEvent(LinkEventTopic, pubEvent); err != nil {
if retryErr := t.eventService.RetryPublish(LinkEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = t.eventService.RetryPublish(LinkEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
...@@ -93,7 +98,12 @@ func (t *service) UpdateLink(link links.Link) error { ...@@ -93,7 +98,12 @@ func (t *service) UpdateLink(link links.Link) error {
pubEvent := event.NewUpdateEvent(link.ID) pubEvent := event.NewUpdateEvent(link.ID)
if err := t.eventService.PublishEvent(LinkEventTopic, pubEvent); err != nil { if err := t.eventService.PublishEvent(LinkEventTopic, pubEvent); err != nil {
if retryErr := t.eventService.RetryPublish(LinkEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = t.eventService.RetryPublish(LinkEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
...@@ -112,7 +122,12 @@ func (t *service) DeleteLink(link links.Link) error { ...@@ -112,7 +122,12 @@ func (t *service) DeleteLink(link links.Link) error {
pubEvent := event.NewDeleteEvent(link.ID) pubEvent := event.NewDeleteEvent(link.ID)
if err := t.eventService.PublishEvent(LinkEventTopic, pubEvent); err != nil { if err := t.eventService.PublishEvent(LinkEventTopic, pubEvent); err != nil {
if retryErr := t.eventService.RetryPublish(LinkEventTopic, pubEvent); retryErr != nil { var retryErr error
go func() {
retryErr = t.eventService.RetryPublish(LinkEventTopic, pubEvent)
}()
if retryErr != nil {
log.Error(retryErr) log.Error(retryErr)
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment