From ffe16f8c744048e764e070751d54cea7b3377285 Mon Sep 17 00:00:00 2001 From: Fabian Seidl <fabian.seidl@h-da.de> Date: Tue, 18 Feb 2025 09:11:50 +0000 Subject: [PATCH] multiple consumers possible --- application-framework/event/eventService.go | 27 +++++++++++++++++++-- controller/eventService/Service.go | 23 +++++++++--------- 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/application-framework/event/eventService.go b/application-framework/event/eventService.go index 9edf6b709..f51bb086e 100644 --- a/application-framework/event/eventService.go +++ b/application-framework/event/eventService.go @@ -77,15 +77,38 @@ func (e *Service) SetupEventReciever(stopChan chan os.Signal) { // setupQueueConsume sets up a connection to the broker, listens to messages on a specific topic // and writes them into a channel. func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error { + err := e.channel.ExchangeDeclare( + topic, // name + "fanout", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + if err != nil { + return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err} + } + q, err := e.channel.QueueDeclare( - topic, // name + "", // name false, // durable false, // delete when unused - false, // exclusive + true, // exclusive false, // no-wait nil, // arguments ) + if err != nil { + return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err} + } + err = e.channel.QueueBind( + q.Name, // queue name + "", // routing key + topic, // exchange + false, + nil, + ) if err != nil { return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err} } diff --git a/controller/eventService/Service.go b/controller/eventService/Service.go index 330cb195b..e3429d78a 100644 --- a/controller/eventService/Service.go +++ b/controller/eventService/Service.go @@ -170,13 +170,14 @@ func initChannel(conn *amqp.Connection) (*amqp.Channel, error) { // PublishEvent declares a queue and publishes events. func (e *EventService) PublishEvent(topic string, event event.Event) error { - q, err := e.channel.QueueDeclare( - topic, // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments + err := e.channel.ExchangeDeclare( + topic, // name + "fanout", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments ) if err != nil { return &customerrs.AMQPInitFailError{Action: "failed declaring queue", Err: err} @@ -189,10 +190,10 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error { err = e.channel.PublishWithContext( context.TODO(), - "", // exchange - q.Name, // routing key - false, // mandatory - false, // immediate + topic, // exchange + "", // routing key + false, // mandatory + false, // immediate amqp.Publishing{ ContentType: "application/json", Body: eventBody, -- GitLab