From 0a2203ec2fb20bb63059ca683e4b06bee1da5b4e Mon Sep 17 00:00:00 2001 From: Fabian Seidl <fabian.seidl@h-da.de> Date: Mon, 22 Aug 2022 12:12:09 +0200 Subject: [PATCH] added go routine to event publishing to prevent blocking --- controller/nucleus/deviceService.go | 28 +++++++++++-- controller/nucleus/sbiService.go | 14 ++++++- controller/rbac/rbacService.go | 42 ++++++++++++++++--- controller/topology/nodes/nodeService.go | 21 ++++++++-- controller/topology/ports/portService.go | 21 ++++++++-- .../routing-tables/routingTableService.go | 21 ++++++++-- controller/topology/topologyService.go | 21 ++++++++-- 7 files changed, 144 insertions(+), 24 deletions(-) diff --git a/controller/nucleus/deviceService.go b/controller/nucleus/deviceService.go index c96496f39..e6aad402f 100644 --- a/controller/nucleus/deviceService.go +++ b/controller/nucleus/deviceService.go @@ -82,7 +82,12 @@ func (s *DeviceService) Add(deviceToAdd device.Device) error { pubEvent := event.NewAddEvent(deviceToAdd.ID()) if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil { - if retryErr := s.eventService.RetryPublish(DeviceEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = s.eventService.RetryPublish(DeviceEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } @@ -116,7 +121,12 @@ func (s *DeviceService) UpdateModel(deviceToUpdate device.Device, modelAsString pubEvent := event.NewUpdateEvent(deviceToUpdate.ID()) if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil { - if retryErr := s.eventService.RetryPublish(DeviceEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = s.eventService.RetryPublish(DeviceEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } @@ -133,7 +143,12 @@ func (s *DeviceService) Update(deviceToUpdate device.Device) error { pubEvent := event.NewUpdateEvent(deviceToUpdate.ID()) if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil { - if retryErr := s.eventService.RetryPublish(DeviceEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = s.eventService.RetryPublish(DeviceEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } @@ -156,7 +171,12 @@ func (s *DeviceService) Delete(deviceToDelete device.Device) error { pubEvent := event.NewDeleteEvent(deviceToDelete.ID()) if err := s.eventService.PublishEvent(DeviceEventTopic, pubEvent); err != nil { - if retryErr := s.eventService.RetryPublish(DeviceEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = s.eventService.RetryPublish(DeviceEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } diff --git a/controller/nucleus/sbiService.go b/controller/nucleus/sbiService.go index 50d4f4e77..8a2442c0b 100644 --- a/controller/nucleus/sbiService.go +++ b/controller/nucleus/sbiService.go @@ -75,7 +75,12 @@ func (s *SbiService) Add(sbiToAdd southbound.SouthboundInterface) error { pubEvent := event.NewAddEvent(sbiToAdd.ID()) if err := s.eventService.PublishEvent(SbiEventTopic, pubEvent); err != nil { - if retryErr := s.eventService.RetryPublish(SbiEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = s.eventService.RetryPublish(SbiEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } @@ -99,7 +104,12 @@ func (s *SbiService) Delete(sbiToDelete southbound.SouthboundInterface) error { pubEvent := event.NewDeleteEvent(sbiToDelete.ID()) if err := s.eventService.PublishEvent(SbiEventTopic, pubEvent); err != nil { - if retryErr := s.eventService.RetryPublish(SbiEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = s.eventService.RetryPublish(SbiEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } diff --git a/controller/rbac/rbacService.go b/controller/rbac/rbacService.go index 37499c35b..19c5d2557 100644 --- a/controller/rbac/rbacService.go +++ b/controller/rbac/rbacService.go @@ -42,7 +42,12 @@ func (s *UserService) Add(userToAdd rbac.User) error { pubEvent := event.NewAddEvent(userToAdd.ID()) if err := s.eventService.PublishEvent(UserEventTopic, pubEvent); err != nil { - if retryErr := s.eventService.RetryPublish(UserEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = s.eventService.RetryPublish(UserEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } @@ -59,7 +64,12 @@ func (s *UserService) Delete(userToDelete rbac.User) error { pubEvent := event.NewDeleteEvent(userToDelete.ID()) if err := s.eventService.PublishEvent(UserEventTopic, pubEvent); err != nil { - if retryErr := s.eventService.RetryPublish(UserEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = s.eventService.RetryPublish(UserEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } @@ -76,7 +86,12 @@ func (s *UserService) Update(userToUpdate rbac.User) error { pubEvent := event.NewUpdateEvent(userToUpdate.ID()) if err := s.eventService.PublishEvent(UserEventTopic, pubEvent); err != nil { - if retryErr := s.eventService.RetryPublish(UserEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = s.eventService.RetryPublish(UserEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } @@ -137,7 +152,12 @@ func (s *RoleService) Add(roleToAdd rbac.Role) error { pubEvent := event.NewAddEvent(roleToAdd.ID()) if err := s.eventService.PublishEvent(RoleEventTopic, pubEvent); err != nil { - if retryErr := s.eventService.RetryPublish(RoleEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = s.eventService.RetryPublish(RoleEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } @@ -154,7 +174,12 @@ func (s *RoleService) Delete(roleToDelete rbac.Role) error { pubEvent := event.NewDeleteEvent(roleToDelete.ID()) if err := s.eventService.PublishEvent(RoleEventTopic, pubEvent); err != nil { - if retryErr := s.eventService.RetryPublish(RoleEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = s.eventService.RetryPublish(RoleEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } @@ -170,7 +195,12 @@ func (s *RoleService) Update(roleToUpdate rbac.Role) error { pubEvent := event.NewUpdateEvent(roleToUpdate.ID()) if err := s.eventService.PublishEvent(RoleEventTopic, pubEvent); err != nil { - if retryErr := s.eventService.RetryPublish(RoleEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = s.eventService.RetryPublish(RoleEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } diff --git a/controller/topology/nodes/nodeService.go b/controller/topology/nodes/nodeService.go index 9de0bc3b3..c77e80421 100644 --- a/controller/topology/nodes/nodeService.go +++ b/controller/topology/nodes/nodeService.go @@ -62,7 +62,12 @@ func (n *NodeService) createNode(node Node) (Node, error) { pubEvent := event.NewAddEvent(node.ID) if err := n.eventService.PublishEvent(NodeEventTopic, pubEvent); err != nil { - if retryErr := n.eventService.RetryPublish(NodeEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = n.eventService.RetryPublish(NodeEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } @@ -79,7 +84,12 @@ func (n *NodeService) Update(node Node) error { pubEvent := event.NewUpdateEvent(node.ID) if err := n.eventService.PublishEvent(NodeEventTopic, pubEvent); err != nil { - if retryErr := n.eventService.RetryPublish(NodeEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = n.eventService.RetryPublish(NodeEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } @@ -96,7 +106,12 @@ func (n *NodeService) Delete(node Node) error { pubEvent := event.NewDeleteEvent(node.ID) if err := n.eventService.PublishEvent(NodeEventTopic, pubEvent); err != nil { - if retryErr := n.eventService.RetryPublish(NodeEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = n.eventService.RetryPublish(NodeEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } diff --git a/controller/topology/ports/portService.go b/controller/topology/ports/portService.go index 1f8f99847..971a98eb8 100644 --- a/controller/topology/ports/portService.go +++ b/controller/topology/ports/portService.go @@ -60,7 +60,12 @@ func (p *PortService) createPort(port Port) (Port, error) { pubEvent := event.NewAddEvent(port.ID) if err := p.eventService.PublishEvent(PortEventTopic, pubEvent); err != nil { - if retryErr := p.eventService.RetryPublish(PortEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = p.eventService.RetryPublish(PortEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } @@ -77,7 +82,12 @@ func (p *PortService) Update(port Port) error { pubEvent := event.NewUpdateEvent(port.ID) if err := p.eventService.PublishEvent(PortEventTopic, pubEvent); err != nil { - if retryErr := p.eventService.RetryPublish(PortEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = p.eventService.RetryPublish(PortEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } @@ -94,7 +104,12 @@ func (p *PortService) Delete(port Port) error { pubEvent := event.NewDeleteEvent(port.ID) if err := p.eventService.PublishEvent(PortEventTopic, pubEvent); err != nil { - if retryErr := p.eventService.RetryPublish(PortEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = p.eventService.RetryPublish(PortEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } diff --git a/controller/topology/routing-tables/routingTableService.go b/controller/topology/routing-tables/routingTableService.go index 43c13f1d7..eefebff39 100644 --- a/controller/topology/routing-tables/routingTableService.go +++ b/controller/topology/routing-tables/routingTableService.go @@ -70,7 +70,12 @@ func (r *RoutingTableService) createRoutingTable(routingTable RoutingTable) (Rou pubEvent := event.NewAddEvent(routingTable.ID) if err := r.eventService.PublishEvent(RoutingTableEventTopic, pubEvent); err != nil { - if retryErr := r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } @@ -87,7 +92,12 @@ func (r *RoutingTableService) Update(routingTable RoutingTable) error { pubEvent := event.NewUpdateEvent(routingTable.ID) if err := r.eventService.PublishEvent(RoutingTableEventTopic, pubEvent); err != nil { - if retryErr := r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } @@ -104,7 +114,12 @@ func (r *RoutingTableService) Delete(routingTable RoutingTable) error { pubEvent := event.NewDeleteEvent(routingTable.ID) if err := r.eventService.PublishEvent(RoutingTableEventTopic, pubEvent); err != nil { - if retryErr := r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = r.eventService.RetryPublish(RoutingTableEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } diff --git a/controller/topology/topologyService.go b/controller/topology/topologyService.go index 41546f085..6bda2fd5c 100644 --- a/controller/topology/topologyService.go +++ b/controller/topology/topologyService.go @@ -76,7 +76,12 @@ func (t *service) AddLink(link links.Link) error { pubEvent := event.NewAddEvent(link.ID) if err := t.eventService.PublishEvent(LinkEventTopic, pubEvent); err != nil { - if retryErr := t.eventService.RetryPublish(LinkEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = t.eventService.RetryPublish(LinkEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } @@ -93,7 +98,12 @@ func (t *service) UpdateLink(link links.Link) error { pubEvent := event.NewUpdateEvent(link.ID) if err := t.eventService.PublishEvent(LinkEventTopic, pubEvent); err != nil { - if retryErr := t.eventService.RetryPublish(LinkEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = t.eventService.RetryPublish(LinkEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } @@ -112,7 +122,12 @@ func (t *service) DeleteLink(link links.Link) error { pubEvent := event.NewDeleteEvent(link.ID) if err := t.eventService.PublishEvent(LinkEventTopic, pubEvent); err != nil { - if retryErr := t.eventService.RetryPublish(LinkEventTopic, pubEvent); retryErr != nil { + var retryErr error + go func() { + retryErr = t.eventService.RetryPublish(LinkEventTopic, pubEvent) + }() + + if retryErr != nil { log.Error(retryErr) } } -- GitLab