From ffe16f8c744048e764e070751d54cea7b3377285 Mon Sep 17 00:00:00 2001
From: Fabian Seidl <fabian.seidl@h-da.de>
Date: Tue, 18 Feb 2025 09:11:50 +0000
Subject: [PATCH 1/5] multiple consumers possible

---
 application-framework/event/eventService.go | 27 +++++++++++++++++++--
 controller/eventService/Service.go          | 23 +++++++++---------
 2 files changed, 37 insertions(+), 13 deletions(-)

diff --git a/application-framework/event/eventService.go b/application-framework/event/eventService.go
index 9edf6b709..f51bb086e 100644
--- a/application-framework/event/eventService.go
+++ b/application-framework/event/eventService.go
@@ -77,15 +77,38 @@ func (e *Service) SetupEventReciever(stopChan chan os.Signal) {
 // setupQueueConsume sets up a connection to the broker, listens to messages on a specific topic
 // and writes them into a channel.
 func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error {
+	err := e.channel.ExchangeDeclare(
+		topic,    // name
+		"fanout", // type
+		true,     // durable
+		false,    // auto-deleted
+		false,    // internal
+		false,    // no-wait
+		nil,      // arguments
+	)
+	if err != nil {
+		return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err}
+	}
+
 	q, err := e.channel.QueueDeclare(
-		topic, // name
+		"",    // name
 		false, // durable
 		false, // delete when unused
-		false, // exclusive
+		true,  // exclusive
 		false, // no-wait
 		nil,   // arguments
 	)
+	if err != nil {
+		return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err}
+	}
 
+	err = e.channel.QueueBind(
+		q.Name, // queue name
+		"",     // routing key
+		topic,  // exchange
+		false,
+		nil,
+	)
 	if err != nil {
 		return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err}
 	}
diff --git a/controller/eventService/Service.go b/controller/eventService/Service.go
index 330cb195b..e3429d78a 100644
--- a/controller/eventService/Service.go
+++ b/controller/eventService/Service.go
@@ -170,13 +170,14 @@ func initChannel(conn *amqp.Connection) (*amqp.Channel, error) {
 
 // PublishEvent declares a queue and publishes events.
 func (e *EventService) PublishEvent(topic string, event event.Event) error {
-	q, err := e.channel.QueueDeclare(
-		topic, // name
-		false, // durable
-		false, // delete when unused
-		false, // exclusive
-		false, // no-wait
-		nil,   // arguments
+	err := e.channel.ExchangeDeclare(
+		topic,    // name
+		"fanout", // type
+		true,     // durable
+		false,    // auto-deleted
+		false,    // internal
+		false,    // no-wait
+		nil,      // arguments
 	)
 	if err != nil {
 		return &customerrs.AMQPInitFailError{Action: "failed declaring queue", Err: err}
@@ -189,10 +190,10 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error {
 
 	err = e.channel.PublishWithContext(
 		context.TODO(),
-		"",     // exchange
-		q.Name, // routing key
-		false,  // mandatory
-		false,  // immediate
+		topic, // exchange
+		"",    // routing key
+		false, // mandatory
+		false, // immediate
 		amqp.Publishing{
 			ContentType: "application/json",
 			Body:        eventBody,
-- 
GitLab


From f2f07be6b319f0b7cb4ba2512901a3c49eb216ae Mon Sep 17 00:00:00 2001
From: Fabian Seidl <fabian.seidl@h-da.de>
Date: Tue, 18 Feb 2025 09:11:50 +0000
Subject: [PATCH 2/5] multiple consumers possible

---
 application-framework/event/eventService.go | 27 +++++++++++++++++++--
 controller/eventService/Service.go          | 23 +++++++++---------
 2 files changed, 37 insertions(+), 13 deletions(-)

diff --git a/application-framework/event/eventService.go b/application-framework/event/eventService.go
index 9edf6b709..f51bb086e 100644
--- a/application-framework/event/eventService.go
+++ b/application-framework/event/eventService.go
@@ -77,15 +77,38 @@ func (e *Service) SetupEventReciever(stopChan chan os.Signal) {
 // setupQueueConsume sets up a connection to the broker, listens to messages on a specific topic
 // and writes them into a channel.
 func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error {
+	err := e.channel.ExchangeDeclare(
+		topic,    // name
+		"fanout", // type
+		true,     // durable
+		false,    // auto-deleted
+		false,    // internal
+		false,    // no-wait
+		nil,      // arguments
+	)
+	if err != nil {
+		return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err}
+	}
+
 	q, err := e.channel.QueueDeclare(
-		topic, // name
+		"",    // name
 		false, // durable
 		false, // delete when unused
-		false, // exclusive
+		true,  // exclusive
 		false, // no-wait
 		nil,   // arguments
 	)
+	if err != nil {
+		return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err}
+	}
 
+	err = e.channel.QueueBind(
+		q.Name, // queue name
+		"",     // routing key
+		topic,  // exchange
+		false,
+		nil,
+	)
 	if err != nil {
 		return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err}
 	}
diff --git a/controller/eventService/Service.go b/controller/eventService/Service.go
index 330cb195b..e3429d78a 100644
--- a/controller/eventService/Service.go
+++ b/controller/eventService/Service.go
@@ -170,13 +170,14 @@ func initChannel(conn *amqp.Connection) (*amqp.Channel, error) {
 
 // PublishEvent declares a queue and publishes events.
 func (e *EventService) PublishEvent(topic string, event event.Event) error {
-	q, err := e.channel.QueueDeclare(
-		topic, // name
-		false, // durable
-		false, // delete when unused
-		false, // exclusive
-		false, // no-wait
-		nil,   // arguments
+	err := e.channel.ExchangeDeclare(
+		topic,    // name
+		"fanout", // type
+		true,     // durable
+		false,    // auto-deleted
+		false,    // internal
+		false,    // no-wait
+		nil,      // arguments
 	)
 	if err != nil {
 		return &customerrs.AMQPInitFailError{Action: "failed declaring queue", Err: err}
@@ -189,10 +190,10 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error {
 
 	err = e.channel.PublishWithContext(
 		context.TODO(),
-		"",     // exchange
-		q.Name, // routing key
-		false,  // mandatory
-		false,  // immediate
+		topic, // exchange
+		"",    // routing key
+		false, // mandatory
+		false, // immediate
 		amqp.Publishing{
 			ContentType: "application/json",
 			Body:        eventBody,
-- 
GitLab


From 88b2ba2eda92784bb3b379a48f20b24399af7fb0 Mon Sep 17 00:00:00 2001
From: Fabian Seidl <fabian.seidl@h-da.de>
Date: Tue, 18 Feb 2025 12:57:56 +0000
Subject: [PATCH 3/5] possible fix for integration test

---
 .../application_tests/application_test.go             | 11 -----------
 1 file changed, 11 deletions(-)

diff --git a/integration-tests/application_tests/application_test.go b/integration-tests/application_tests/application_test.go
index 9b3797a55..ee8ec34f2 100644
--- a/integration-tests/application_tests/application_test.go
+++ b/integration-tests/application_tests/application_test.go
@@ -94,17 +94,6 @@ func TestMain(m *testing.M) {
 	}
 	go application.Run(eventTypeCallbackTuples)
 
-	// This is needed to clear the go channel of the messages sent by RabbitMQ when creating
-	// and logging in with the admin user.
-	// Important note: only works once after starting the setup, because first time use creates
-	// a user and role and update the user because of the login. After then only logins are done, no user and role creations.
-	// This means that this will block after trying once, because of the three attempts to read from eventChannels.
-
-	<-application.addEventChannel
-	<-application.addEventChannel
-	<-application.addEventChannel
-	<-application.updateEventChannel
-
 	m.Run()
 }
 
-- 
GitLab


From 4aa8252e8dafc099246dc30f9b81eb3a12e86a50 Mon Sep 17 00:00:00 2001
From: Fabian Seidl <fabian.seidl@h-da.de>
Date: Tue, 18 Feb 2025 13:26:22 +0000
Subject: [PATCH 4/5] update descriptions, add proper ctx

---
 application-framework/event/eventService.go | 6 +++---
 controller/eventService/Service.go          | 7 +++++--
 2 files changed, 8 insertions(+), 5 deletions(-)

diff --git a/application-framework/event/eventService.go b/application-framework/event/eventService.go
index f51bb086e..930a848d2 100644
--- a/application-framework/event/eventService.go
+++ b/application-framework/event/eventService.go
@@ -87,7 +87,7 @@ func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error
 		nil,      // arguments
 	)
 	if err != nil {
-		return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err}
+		return &customerrs.AMQPInitFailError{Action: "failed declaring exchange", Err: err}
 	}
 
 	q, err := e.channel.QueueDeclare(
@@ -99,7 +99,7 @@ func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error
 		nil,   // arguments
 	)
 	if err != nil {
-		return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err}
+		return &customerrs.AMQPInitFailError{Action: "failed declaring queue", Err: err}
 	}
 
 	err = e.channel.QueueBind(
@@ -110,7 +110,7 @@ func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error
 		nil,
 	)
 	if err != nil {
-		return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err}
+		return &customerrs.AMQPInitFailError{Action: "failed binding to queue to exchange", Err: err}
 	}
 
 	msgs, err := e.channel.Consume(
diff --git a/controller/eventService/Service.go b/controller/eventService/Service.go
index e3429d78a..a3041b8bd 100644
--- a/controller/eventService/Service.go
+++ b/controller/eventService/Service.go
@@ -180,7 +180,7 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error {
 		nil,      // arguments
 	)
 	if err != nil {
-		return &customerrs.AMQPInitFailError{Action: "failed declaring queue", Err: err}
+		return &customerrs.AMQPInitFailError{Action: "failed declaring exchange", Err: err}
 	}
 
 	eventBody, err := json.Marshal(event)
@@ -188,8 +188,11 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error {
 		return &customerrs.CouldNotMarshallError{Identifier: topic + " " + event.EntityID.String(), Type: event.Type, Err: err}
 	}
 
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	defer cancel()
+
 	err = e.channel.PublishWithContext(
-		context.TODO(),
+		ctx,
 		topic, // exchange
 		"",    // routing key
 		false, // mandatory
-- 
GitLab


From 149199bafeb1eb23705c339946220a5af51c3469 Mon Sep 17 00:00:00 2001
From: Fabian Seidl <fabian.seidl@h-da.de>
Date: Tue, 18 Feb 2025 13:28:23 +0000
Subject: [PATCH 5/5] fix description

---
 application-framework/event/eventService.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/application-framework/event/eventService.go b/application-framework/event/eventService.go
index 930a848d2..76b42d602 100644
--- a/application-framework/event/eventService.go
+++ b/application-framework/event/eventService.go
@@ -110,7 +110,7 @@ func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error
 		nil,
 	)
 	if err != nil {
-		return &customerrs.AMQPInitFailError{Action: "failed binding to queue to exchange", Err: err}
+		return &customerrs.AMQPInitFailError{Action: "failed binding queue to exchange", Err: err}
 	}
 
 	msgs, err := e.channel.Consume(
-- 
GitLab