From ffe16f8c744048e764e070751d54cea7b3377285 Mon Sep 17 00:00:00 2001 From: Fabian Seidl <fabian.seidl@h-da.de> Date: Tue, 18 Feb 2025 09:11:50 +0000 Subject: [PATCH 1/5] multiple consumers possible --- application-framework/event/eventService.go | 27 +++++++++++++++++++-- controller/eventService/Service.go | 23 +++++++++--------- 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/application-framework/event/eventService.go b/application-framework/event/eventService.go index 9edf6b709..f51bb086e 100644 --- a/application-framework/event/eventService.go +++ b/application-framework/event/eventService.go @@ -77,15 +77,38 @@ func (e *Service) SetupEventReciever(stopChan chan os.Signal) { // setupQueueConsume sets up a connection to the broker, listens to messages on a specific topic // and writes them into a channel. func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error { + err := e.channel.ExchangeDeclare( + topic, // name + "fanout", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + if err != nil { + return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err} + } + q, err := e.channel.QueueDeclare( - topic, // name + "", // name false, // durable false, // delete when unused - false, // exclusive + true, // exclusive false, // no-wait nil, // arguments ) + if err != nil { + return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err} + } + err = e.channel.QueueBind( + q.Name, // queue name + "", // routing key + topic, // exchange + false, + nil, + ) if err != nil { return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err} } diff --git a/controller/eventService/Service.go b/controller/eventService/Service.go index 330cb195b..e3429d78a 100644 --- a/controller/eventService/Service.go +++ b/controller/eventService/Service.go @@ -170,13 +170,14 @@ func initChannel(conn *amqp.Connection) (*amqp.Channel, error) { // PublishEvent declares a queue and publishes events. func (e *EventService) PublishEvent(topic string, event event.Event) error { - q, err := e.channel.QueueDeclare( - topic, // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments + err := e.channel.ExchangeDeclare( + topic, // name + "fanout", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments ) if err != nil { return &customerrs.AMQPInitFailError{Action: "failed declaring queue", Err: err} @@ -189,10 +190,10 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error { err = e.channel.PublishWithContext( context.TODO(), - "", // exchange - q.Name, // routing key - false, // mandatory - false, // immediate + topic, // exchange + "", // routing key + false, // mandatory + false, // immediate amqp.Publishing{ ContentType: "application/json", Body: eventBody, -- GitLab From f2f07be6b319f0b7cb4ba2512901a3c49eb216ae Mon Sep 17 00:00:00 2001 From: Fabian Seidl <fabian.seidl@h-da.de> Date: Tue, 18 Feb 2025 09:11:50 +0000 Subject: [PATCH 2/5] multiple consumers possible --- application-framework/event/eventService.go | 27 +++++++++++++++++++-- controller/eventService/Service.go | 23 +++++++++--------- 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/application-framework/event/eventService.go b/application-framework/event/eventService.go index 9edf6b709..f51bb086e 100644 --- a/application-framework/event/eventService.go +++ b/application-framework/event/eventService.go @@ -77,15 +77,38 @@ func (e *Service) SetupEventReciever(stopChan chan os.Signal) { // setupQueueConsume sets up a connection to the broker, listens to messages on a specific topic // and writes them into a channel. func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error { + err := e.channel.ExchangeDeclare( + topic, // name + "fanout", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + if err != nil { + return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err} + } + q, err := e.channel.QueueDeclare( - topic, // name + "", // name false, // durable false, // delete when unused - false, // exclusive + true, // exclusive false, // no-wait nil, // arguments ) + if err != nil { + return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err} + } + err = e.channel.QueueBind( + q.Name, // queue name + "", // routing key + topic, // exchange + false, + nil, + ) if err != nil { return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err} } diff --git a/controller/eventService/Service.go b/controller/eventService/Service.go index 330cb195b..e3429d78a 100644 --- a/controller/eventService/Service.go +++ b/controller/eventService/Service.go @@ -170,13 +170,14 @@ func initChannel(conn *amqp.Connection) (*amqp.Channel, error) { // PublishEvent declares a queue and publishes events. func (e *EventService) PublishEvent(topic string, event event.Event) error { - q, err := e.channel.QueueDeclare( - topic, // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments + err := e.channel.ExchangeDeclare( + topic, // name + "fanout", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments ) if err != nil { return &customerrs.AMQPInitFailError{Action: "failed declaring queue", Err: err} @@ -189,10 +190,10 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error { err = e.channel.PublishWithContext( context.TODO(), - "", // exchange - q.Name, // routing key - false, // mandatory - false, // immediate + topic, // exchange + "", // routing key + false, // mandatory + false, // immediate amqp.Publishing{ ContentType: "application/json", Body: eventBody, -- GitLab From 88b2ba2eda92784bb3b379a48f20b24399af7fb0 Mon Sep 17 00:00:00 2001 From: Fabian Seidl <fabian.seidl@h-da.de> Date: Tue, 18 Feb 2025 12:57:56 +0000 Subject: [PATCH 3/5] possible fix for integration test --- .../application_tests/application_test.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/integration-tests/application_tests/application_test.go b/integration-tests/application_tests/application_test.go index 9b3797a55..ee8ec34f2 100644 --- a/integration-tests/application_tests/application_test.go +++ b/integration-tests/application_tests/application_test.go @@ -94,17 +94,6 @@ func TestMain(m *testing.M) { } go application.Run(eventTypeCallbackTuples) - // This is needed to clear the go channel of the messages sent by RabbitMQ when creating - // and logging in with the admin user. - // Important note: only works once after starting the setup, because first time use creates - // a user and role and update the user because of the login. After then only logins are done, no user and role creations. - // This means that this will block after trying once, because of the three attempts to read from eventChannels. - - <-application.addEventChannel - <-application.addEventChannel - <-application.addEventChannel - <-application.updateEventChannel - m.Run() } -- GitLab From 4aa8252e8dafc099246dc30f9b81eb3a12e86a50 Mon Sep 17 00:00:00 2001 From: Fabian Seidl <fabian.seidl@h-da.de> Date: Tue, 18 Feb 2025 13:26:22 +0000 Subject: [PATCH 4/5] update descriptions, add proper ctx --- application-framework/event/eventService.go | 6 +++--- controller/eventService/Service.go | 7 +++++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/application-framework/event/eventService.go b/application-framework/event/eventService.go index f51bb086e..930a848d2 100644 --- a/application-framework/event/eventService.go +++ b/application-framework/event/eventService.go @@ -87,7 +87,7 @@ func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error nil, // arguments ) if err != nil { - return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err} + return &customerrs.AMQPInitFailError{Action: "failed declaring exchange", Err: err} } q, err := e.channel.QueueDeclare( @@ -99,7 +99,7 @@ func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error nil, // arguments ) if err != nil { - return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err} + return &customerrs.AMQPInitFailError{Action: "failed declaring queue", Err: err} } err = e.channel.QueueBind( @@ -110,7 +110,7 @@ func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error nil, ) if err != nil { - return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err} + return &customerrs.AMQPInitFailError{Action: "failed binding to queue to exchange", Err: err} } msgs, err := e.channel.Consume( diff --git a/controller/eventService/Service.go b/controller/eventService/Service.go index e3429d78a..a3041b8bd 100644 --- a/controller/eventService/Service.go +++ b/controller/eventService/Service.go @@ -180,7 +180,7 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error { nil, // arguments ) if err != nil { - return &customerrs.AMQPInitFailError{Action: "failed declaring queue", Err: err} + return &customerrs.AMQPInitFailError{Action: "failed declaring exchange", Err: err} } eventBody, err := json.Marshal(event) @@ -188,8 +188,11 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error { return &customerrs.CouldNotMarshallError{Identifier: topic + " " + event.EntityID.String(), Type: event.Type, Err: err} } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err = e.channel.PublishWithContext( - context.TODO(), + ctx, topic, // exchange "", // routing key false, // mandatory -- GitLab From 149199bafeb1eb23705c339946220a5af51c3469 Mon Sep 17 00:00:00 2001 From: Fabian Seidl <fabian.seidl@h-da.de> Date: Tue, 18 Feb 2025 13:28:23 +0000 Subject: [PATCH 5/5] fix description --- application-framework/event/eventService.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/application-framework/event/eventService.go b/application-framework/event/eventService.go index 930a848d2..76b42d602 100644 --- a/application-framework/event/eventService.go +++ b/application-framework/event/eventService.go @@ -110,7 +110,7 @@ func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error nil, ) if err != nil { - return &customerrs.AMQPInitFailError{Action: "failed binding to queue to exchange", Err: err} + return &customerrs.AMQPInitFailError{Action: "failed binding queue to exchange", Err: err} } msgs, err := e.channel.Consume( -- GitLab