diff --git a/application-framework/event/eventService.go b/application-framework/event/eventService.go index 9edf6b7096a22f59b17dbd6092db15df8874df55..f51bb086e989073f46ed06dab537d26f0266576a 100644 --- a/application-framework/event/eventService.go +++ b/application-framework/event/eventService.go @@ -77,15 +77,38 @@ func (e *Service) SetupEventReciever(stopChan chan os.Signal) { // setupQueueConsume sets up a connection to the broker, listens to messages on a specific topic // and writes them into a channel. func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error { + err := e.channel.ExchangeDeclare( + topic, // name + "fanout", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + if err != nil { + return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err} + } + q, err := e.channel.QueueDeclare( - topic, // name + "", // name false, // durable false, // delete when unused - false, // exclusive + true, // exclusive false, // no-wait nil, // arguments ) + if err != nil { + return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err} + } + err = e.channel.QueueBind( + q.Name, // queue name + "", // routing key + topic, // exchange + false, + nil, + ) if err != nil { return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err} } diff --git a/controller/eventService/Service.go b/controller/eventService/Service.go index 330cb195b861999cc6641da0a60228816323460c..e3429d78ad7920de8b68d112fd268c959f786927 100644 --- a/controller/eventService/Service.go +++ b/controller/eventService/Service.go @@ -170,13 +170,14 @@ func initChannel(conn *amqp.Connection) (*amqp.Channel, error) { // PublishEvent declares a queue and publishes events. func (e *EventService) PublishEvent(topic string, event event.Event) error { - q, err := e.channel.QueueDeclare( - topic, // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments + err := e.channel.ExchangeDeclare( + topic, // name + "fanout", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments ) if err != nil { return &customerrs.AMQPInitFailError{Action: "failed declaring queue", Err: err} @@ -189,10 +190,10 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error { err = e.channel.PublishWithContext( context.TODO(), - "", // exchange - q.Name, // routing key - false, // mandatory - false, // immediate + topic, // exchange + "", // routing key + false, // mandatory + false, // immediate amqp.Publishing{ ContentType: "application/json", Body: eventBody,