From eb785f99937c39dae73419d317708b1a3da71463 Mon Sep 17 00:00:00 2001
From: Fabian Seidl <fabian.seidl@h-da.de>
Date: Tue, 18 Feb 2025 13:39:30 +0000
Subject: [PATCH] Fix multiple rabbitmq consumers receiving all messages

See merge request danet/gosdn!1217
---
 application-framework/event/eventService.go   | 29 ++++++++++++++++--
 controller/eventService/Service.go            | 30 +++++++++++--------
 .../application_tests/application_test.go     | 11 -------
 3 files changed, 43 insertions(+), 27 deletions(-)

diff --git a/application-framework/event/eventService.go b/application-framework/event/eventService.go
index 9edf6b709..76b42d602 100644
--- a/application-framework/event/eventService.go
+++ b/application-framework/event/eventService.go
@@ -77,17 +77,40 @@ func (e *Service) SetupEventReciever(stopChan chan os.Signal) {
 // setupQueueConsume sets up a connection to the broker, listens to messages on a specific topic
 // and writes them into a channel.
 func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error {
+	err := e.channel.ExchangeDeclare(
+		topic,    // name
+		"fanout", // type
+		true,     // durable
+		false,    // auto-deleted
+		false,    // internal
+		false,    // no-wait
+		nil,      // arguments
+	)
+	if err != nil {
+		return &customerrs.AMQPInitFailError{Action: "failed declaring exchange", Err: err}
+	}
+
 	q, err := e.channel.QueueDeclare(
-		topic, // name
+		"",    // name
 		false, // durable
 		false, // delete when unused
-		false, // exclusive
+		true,  // exclusive
 		false, // no-wait
 		nil,   // arguments
 	)
+	if err != nil {
+		return &customerrs.AMQPInitFailError{Action: "failed declaring queue", Err: err}
+	}
 
+	err = e.channel.QueueBind(
+		q.Name, // queue name
+		"",     // routing key
+		topic,  // exchange
+		false,
+		nil,
+	)
 	if err != nil {
-		return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err}
+		return &customerrs.AMQPInitFailError{Action: "failed binding queue to exchange", Err: err}
 	}
 
 	msgs, err := e.channel.Consume(
diff --git a/controller/eventService/Service.go b/controller/eventService/Service.go
index 330cb195b..a3041b8bd 100644
--- a/controller/eventService/Service.go
+++ b/controller/eventService/Service.go
@@ -170,16 +170,17 @@ func initChannel(conn *amqp.Connection) (*amqp.Channel, error) {
 
 // PublishEvent declares a queue and publishes events.
 func (e *EventService) PublishEvent(topic string, event event.Event) error {
-	q, err := e.channel.QueueDeclare(
-		topic, // name
-		false, // durable
-		false, // delete when unused
-		false, // exclusive
-		false, // no-wait
-		nil,   // arguments
+	err := e.channel.ExchangeDeclare(
+		topic,    // name
+		"fanout", // type
+		true,     // durable
+		false,    // auto-deleted
+		false,    // internal
+		false,    // no-wait
+		nil,      // arguments
 	)
 	if err != nil {
-		return &customerrs.AMQPInitFailError{Action: "failed declaring queue", Err: err}
+		return &customerrs.AMQPInitFailError{Action: "failed declaring exchange", Err: err}
 	}
 
 	eventBody, err := json.Marshal(event)
@@ -187,12 +188,15 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error {
 		return &customerrs.CouldNotMarshallError{Identifier: topic + " " + event.EntityID.String(), Type: event.Type, Err: err}
 	}
 
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	defer cancel()
+
 	err = e.channel.PublishWithContext(
-		context.TODO(),
-		"",     // exchange
-		q.Name, // routing key
-		false,  // mandatory
-		false,  // immediate
+		ctx,
+		topic, // exchange
+		"",    // routing key
+		false, // mandatory
+		false, // immediate
 		amqp.Publishing{
 			ContentType: "application/json",
 			Body:        eventBody,
diff --git a/integration-tests/application_tests/application_test.go b/integration-tests/application_tests/application_test.go
index 9b3797a55..ee8ec34f2 100644
--- a/integration-tests/application_tests/application_test.go
+++ b/integration-tests/application_tests/application_test.go
@@ -94,17 +94,6 @@ func TestMain(m *testing.M) {
 	}
 	go application.Run(eventTypeCallbackTuples)
 
-	// This is needed to clear the go channel of the messages sent by RabbitMQ when creating
-	// and logging in with the admin user.
-	// Important note: only works once after starting the setup, because first time use creates
-	// a user and role and update the user because of the login. After then only logins are done, no user and role creations.
-	// This means that this will block after trying once, because of the three attempts to read from eventChannels.
-
-	<-application.addEventChannel
-	<-application.addEventChannel
-	<-application.addEventChannel
-	<-application.updateEventChannel
-
 	m.Run()
 }
 
-- 
GitLab